未验证 提交 93638564 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #11017 from taosdata/feature/qnode

Feature/qnode
......@@ -1125,7 +1125,6 @@ int32_t tDeserializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq*
void tFreeSSchedulerHbReq(SSchedulerHbReq* pReq);
typedef struct {
uint64_t seqId;
SQueryNodeEpId epId;
SArray* taskStatus; // SArray<STaskStatus>
} SSchedulerHbRsp;
......
......@@ -28,6 +28,7 @@ typedef struct SMgmtWrapper SMgmtWrapper;
typedef enum {
QUERY_QUEUE,
FETCH_QUEUE,
READ_QUEUE,
WRITE_QUEUE,
APPLY_QUEUE,
SYNC_QUEUE,
......
......@@ -212,6 +212,10 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
// Requests handled by SCHEDULER
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "scheduler-link-broken", NULL, NULL)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
#endif
......
......@@ -112,7 +112,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t d
int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId);
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName);
int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName);
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid);
......
......@@ -48,6 +48,7 @@ typedef struct SScanLogicNode {
uint8_t scanFlag; // denotes reversed scan of data or not
STimeWindow scanRange;
SName tableName;
bool showRewrite;
} SScanLogicNode;
typedef struct SJoinLogicNode {
......@@ -171,6 +172,8 @@ typedef SScanPhysiNode SStreamScanPhysiNode;
typedef struct SSystemTableScanPhysiNode {
SScanPhysiNode scan;
SEpSet mgmtEpSet;
bool showRewrite;
int32_t accountId;
} SSystemTableScanPhysiNode;
typedef struct STableScanPhysiNode {
......
......@@ -54,6 +54,7 @@ typedef struct SQuery {
int32_t msgType;
SArray* pDbList;
SArray* pTableList;
bool showRewrite;
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
......
......@@ -29,6 +29,7 @@ typedef struct SPlanContext {
SNode* pAstRoot;
bool topicQuery;
bool streamQuery;
bool showRewrite;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......
......@@ -150,6 +150,8 @@ int32_t cleanupTaskQueue();
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *ctx);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
......
......@@ -28,6 +28,7 @@ enum {
NODE_TYPE_VNODE = 1,
NODE_TYPE_QNODE,
NODE_TYPE_SNODE,
NODE_TYPE_MNODE,
};
......
......@@ -52,8 +52,8 @@ typedef struct {
char user[TSDB_USER_LEN];
SRpcMsg rpcMsg;
int32_t rspLen;
void *pRsp;
void *pNode;
void * pRsp;
void * pNode;
} SNodeMsg;
typedef struct SRpcInit {
......@@ -81,13 +81,21 @@ typedef struct SRpcInit {
} SRpcInit;
typedef struct {
void * val;
int32_t len;
void (*freeFunc)(const void *arg);
void *val;
int32_t (*clone)(void *src, void **dst);
void (*freeFunc)(const void *arg);
} SRpcCtxVal;
typedef struct {
SHashObj *args;
int32_t msgType;
void *val;
int32_t (*clone)(void *src, void **dst);
void (*freeFunc)(const void *arg);
} SRpcBrokenlinkVal;
typedef struct {
SHashObj * args;
SRpcBrokenlinkVal brokenVal;
} SRpcCtx;
int32_t rpcInit();
......
......@@ -109,6 +109,8 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_INS_TABLE_USER_USERS "user_users"
#define TSDB_INS_TABLE_VGROUPS "vgroups"
#define TSDB_INS_USER_STABLES_DBNAME_COLID 2
#define TSDB_TICK_PER_SECOND(precision) \
((int64_t)((precision) == TSDB_TIME_PRECISION_MILLI ? 1e3L \
: ((precision) == TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)))
......
......@@ -192,6 +192,10 @@ static void doDestroyRequest(void *p) {
doFreeReqResultInfo(&pRequest->body.resInfo);
qDestroyQueryPlan(pRequest->body.pDag);
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
if (pRequest->body.showInfo.pArray != NULL) {
taosArrayDestroy(pRequest->body.showInfo.pArray);
}
......
......@@ -199,7 +199,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.queryId = pRequest->requestId,
.acctId = pRequest->pTscObj->acctId,
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite
};
int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
if (code != 0) {
......@@ -330,6 +331,8 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
pRequest->code = code;
break;
}
destroyRequest(pRequest);
}
return pRequest;
......
......@@ -2871,7 +2871,6 @@ int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pR
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->seqId) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->epId.nodeId) < 0) return -1;
if (tEncodeU16(&encoder, pRsp->epId.ep.port) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->epId.ep.fqdn) < 0) return -1;
......@@ -2900,7 +2899,6 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->seqId) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->epId.nodeId) < 0) return -1;
if (tDecodeU16(&decoder, &pRsp->epId.ep.port) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->epId.ep.fqdn) < 0) return -1;
......
......@@ -134,6 +134,7 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)};
SRpcMsg rpcRsp = {
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
rpcSendResponse(&rpcRsp);
}
......@@ -23,7 +23,7 @@
static void *dmThreadRoutine(void *param) {
SDnodeMgmt *pMgmt = param;
SDnode *pDnode = pMgmt->pDnode;
SDnode * pDnode = pMgmt->pDnode;
int64_t lastStatusTime = taosGetTimestampMs();
int64_t lastMonitorTime = lastStatusTime;
......@@ -55,7 +55,7 @@ static void *dmThreadRoutine(void *param) {
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnodeMgmt *pMgmt = pInfo->ahandle;
SDnode *pDnode = pMgmt->pDnode;
SDnode * pDnode = pMgmt->pDnode;
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
dTrace("msg:%p, will be processed in dnode queue", pMsg);
......
......@@ -28,6 +28,7 @@ typedef struct SMnodeMgmt {
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SSingleWorker queryWorker;
SSingleWorker readWorker;
SSingleWorker writeWorker;
SSingleWorker syncWorker;
......@@ -57,11 +58,13 @@ void mmStopWorker(SMnodeMgmt *pMgmt);
int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessQueryMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_MNODE_INT_H_*/
\ No newline at end of file
#endif /*_TD_DND_MNODE_INT_H_*/
......@@ -45,7 +45,8 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToReadQueue;
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue;
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
......@@ -258,4 +259,4 @@ int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo
SMonGrantInfo *pGrantInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
}
\ No newline at end of file
}
......@@ -156,9 +156,10 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessQueryMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessQueryMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessQueryMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)mmProcessQueryMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)mmProcessQueryMsg, MND_VGID);
}
......@@ -44,6 +44,30 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, will be processed in mnode queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
pMsg->pNode = pMgmt->pMnode;
code = mndProcessMsg(pMsg);
if (pRpc->msgType & 1U) {
if (pRpc->handle == NULL) return;
if (code != 0) {
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .ahandle = pRpc->ahandle};
dndSendRsp(pMgmt->pWrapper, &rsp);
}
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg);
......@@ -61,6 +85,10 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
}
int32_t mmProcessQueryMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
}
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) {
......@@ -90,8 +118,20 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc);
}
int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc);
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
SSingleWorkerCfg queryCfg = {.minNum = 0, .maxNum = 1, .name = "mnode-query", .fp = (FItem)mmProcessQueryQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) {
dError("failed to start mnode-query worker since %s", terrstr());
return -1;
}
if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) {
dError("failed to start mnode-read worker since %s", terrstr());
......@@ -114,6 +154,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
void mmStopWorker(SMnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker);
dDebug("mnode workers are closed");
......
......@@ -165,8 +165,8 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
int32_t code = -1;
SMsgHead *pHead = pRpc->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
......
......@@ -6,7 +6,7 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
mnode scheduler sdb wal transport cjson sync monitor stream parser
mnode scheduler sdb wal transport cjson sync monitor executor qworker stream parser
)
if(${BUILD_TEST})
......
......@@ -59,6 +59,8 @@ typedef struct SMnodeLoad {
int64_t compStorage;
} SMnodeLoad;
typedef struct SQWorkerMgmt SQHandle;
typedef struct {
const char *name;
MndInitFp initFp;
......@@ -112,6 +114,7 @@ typedef struct SMnode {
SSdb *pSdb;
SMgmtWrapper *pWrapper;
SArray *pSteps;
SQHandle *pQuery;
SShowMgmt showMgmt;
SProfileMgmt profileMgmt;
STelemMgmt telemMgmt;
......@@ -119,7 +122,7 @@ typedef struct SMnode {
SHashObj *infosMeta;
SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX];
SMsgCb msgCb;
SMsgCb msgCb;
} SMnode;
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_QUERY_H_
#define _TD_MND_QUERY_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitQuery(SMnode *pMnode);
void mndCleanupQuery(SMnode *pMnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_QUERY_H_*/
......@@ -1034,7 +1034,9 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
usedbRsp.vgVersion = usedbReq.vgVersion;
code = 0;
}
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
// no jump, need to construct rsp
} else {
pDb = mndAcquireDb(pMnode, usedbReq.db);
if (pDb == NULL) {
......
......@@ -19,6 +19,7 @@
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
//!!!! Note: only APPEND columns in below tables, NO insert !!!!
static const SInfosTableSchema dnodesSchema[] = {{.name = "id", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndQuery.h"
#include "mndMnode.h"
#include "executor.h"
#include "qworker.h"
int32_t mndProcessQueryMsg(SNodeMsg *pReq) {
mTrace("message in query queue is processing");
SMnode *pMnode = pReq->pNode;
SReadHandle handle = {0};
switch (pReq->rpcMsg.msgType) {
case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(&handle, pMnode->pQuery, &pReq->rpcMsg);
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, &pReq->rpcMsg);
default:
mError("unknown msg type:%d in query queue", pReq->rpcMsg.msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
int32_t mndProcessFetchMsg(SNodeMsg *pReq) {
mTrace("message in fetch queue is processing");
SMnode *pMnode = pReq->pNode;
switch (pReq->rpcMsg.msgType) {
case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg);
case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg);
default:
mError("unknown msg type:%d in fetch queue", pReq->rpcMsg.msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
int32_t mndInitQuery(SMnode *pMnode) {
int32_t code = qWorkerInit(NODE_TYPE_MNODE, MND_VGID, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb);
if (code) {
return code;
}
mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_CONTINUE, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessFetchMsg);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessFetchMsg);
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessFetchMsg);
return 0;
}
void mndCleanupQuery(SMnode *pMnode) { qWorkerDestroy((void **)&pMnode->pQuery); }
......@@ -357,6 +357,13 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
// if free flag is set, client wants to clean the resources
if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead);
if (rowsRead < 0) {
terrno = rowsRead;
rpcFreeCont(pRsp);
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
mndReleaseShowObj((SShowObj*) pShow, true);
return -1;
}
}
mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead);
......
......@@ -1613,7 +1613,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
SDbObj* pDb = NULL;
if (strlen(pShow->db) > 0) {
pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0;
if (pDb == NULL) return terrno;
}
while (numOfRows < rows) {
......
......@@ -39,6 +39,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndQuery.h"
#define MQ_TIMER_MS 3000
#define TRNAS_TIMER_MS 6000
......@@ -79,7 +80,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
.pCont = pReq,
.contLen = contLen,
};
tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg);
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
......@@ -91,7 +92,7 @@ static void mndPullupTelem(void *param, void *tmrId) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg);
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
......@@ -217,6 +218,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
// if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
......
......@@ -31,9 +31,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config};
switch (pMsg->msgType) {
case TDMT_VND_QUERY: {
case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
}
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg);
default:
......@@ -205,7 +204,7 @@ _exit:
rpcSendResponse(&rpcMsg);
return code;
return TSDB_CODE_SUCCESS;
}
static void freeItemHelper(void *pItem) {
......
......@@ -406,9 +406,9 @@ _return:
}
int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid) {
int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq) {
int32_t code = 0;
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB};
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB, .syncReq = syncReq};
SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
......@@ -435,9 +435,9 @@ _return:
int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName) {
int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq) {
int32_t code = 0;
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL};
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL, .syncReq = syncReq};
SCtgRemoveTblMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveTblMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg));
......@@ -496,7 +496,7 @@ _return:
int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) {
int32_t code = 0;
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL, .syncReq = syncReq};
SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
......@@ -1843,6 +1843,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
if (CTG_IS_META_NULL(output->metaType)) {
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pTableName));
catalogRemoveTableMeta(pCtg, pTableName);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
......@@ -1951,9 +1952,9 @@ _return:
}
if (TSDB_SUPER_TABLE == tbType) {
ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, suid);
ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, suid, false);
} else {
ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname);
ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, false);
}
}
......@@ -2534,7 +2535,7 @@ int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId,
}
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) {
CTG_API_ENTER();
int32_t code = 0;
......@@ -2561,9 +2562,9 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
tNameGetFullDbName(pTableName, dbFName);
if (TSDB_SUPER_TABLE == tblMeta->tableType) {
CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, tblMeta->suid));
CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, tblMeta->suid, true));
} else {
CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname));
CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true));
}
......@@ -2588,7 +2589,7 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, stbName, suid));
CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, stbName, suid, true));
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
......
......@@ -449,6 +449,8 @@ typedef struct SSysTableScanInfo {
SEpSet epSet;
tsem_t ready;
int32_t accountId;
bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
void *pCur; // cursor for iterate the local table meta store.
SArray *scanCols; // SArray<int16_t> scan column id list
......@@ -655,7 +657,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo);
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
......
......@@ -5489,7 +5489,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
}
static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) {
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*) param;
SOperatorInfo* operator = (SOperatorInfo *)param;
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo *)operator->info;
if (TSDB_CODE_SUCCESS == code) {
pScanResInfo->pRsp = pMsg->pData;
......@@ -5498,6 +5499,8 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->handle = htobe64(pRsp->handle);
pRsp->compLen = htonl(pRsp->compLen);
} else {
operator->pTaskInfo->code = code;
}
tsem_post(&pScanResInfo->ready);
......@@ -5544,6 +5547,64 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes;
}
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
int32_t code = TSDB_CODE_SUCCESS;
ENodeType nType = nodeType(pNode);
switch (nType) {
case QUERY_NODE_OPERATOR: {
SOperatorNode *node = (SOperatorNode *)pNode;
if (OP_TYPE_EQUAL == node->opType) {
*(int32_t *)pContext = 1;
return DEAL_RES_CONTINUE;
}
*(int32_t *)pContext = 0;
return DEAL_RES_IGNORE_CHILD;
}
case QUERY_NODE_COLUMN: {
if (1 != *(int32_t *)pContext) {
return DEAL_RES_CONTINUE;
}
SColumnNode *node = (SColumnNode *)pNode;
if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) {
*(int32_t *)pContext = 2;
return DEAL_RES_CONTINUE;
}
*(int32_t *)pContext = 0;
return DEAL_RES_CONTINUE;
}
case QUERY_NODE_VALUE: {
if (2 != *(int32_t *)pContext) {
return DEAL_RES_CONTINUE;
}
SValueNode *node = (SValueNode *)pNode;
char *dbName = nodesGetValueFromNode(node);
strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
*((char *)pContext + varDataLen(dbName)) = 0;
return DEAL_RES_ERROR; // stop walk
}
default:
break;
}
return DEAL_RES_CONTINUE;
}
void getDBNameFromCondition(SNode *pCondition, char *dbName) {
if (NULL == pCondition) {
return;
}
nodesWalkNode(pCondition, getDBNameFromConditionWalker, dbName);
}
static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
// build message and send to mnode to fetch the content of system tables.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -5600,6 +5661,11 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
pInfo->req.type = pInfo->type;
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
if (pInfo->showRewrite) {
char dbName[TSDB_DB_NAME_LEN] = {0};
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
}
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
char* buf1 = taosMemoryCalloc(1, contLen);
......@@ -5613,7 +5679,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
return NULL;
}
pMsgSendInfo->param = pInfo;
pMsgSendInfo->param = pOperator;
pMsgSendInfo->msgInfo.pData = buf1;
pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
......@@ -5623,6 +5689,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
tsem_wait(&pInfo->ready);
if (pTaskInfo->code) {
return NULL;
}
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
pInfo->req.showId = pRsp->handle;
......@@ -5644,7 +5714,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
}
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo) {
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) {
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -5654,10 +5724,12 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
return NULL;
}
pInfo->pRes = pResBlock;
pInfo->capacity = 4096;
pInfo->pCondition = pCondition;
pInfo->scanCols = colList;
pInfo->accountId = accountId;
pInfo->showRewrite = showRewrite;
pInfo->pRes = pResBlock;
pInfo->capacity = 4096;
pInfo->pCondition = pCondition;
pInfo->scanCols = colList;
// TODO remove it
int32_t tableType = 0;
......@@ -8530,7 +8602,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(pHandle->meta, pResBlock, &pScanNode->tableName,
pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList, pTaskInfo);
pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet,
colList, pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
return pOperator;
} else {
ASSERT(0);
......
......@@ -237,6 +237,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(scanFlag);
COPY_SCALAR_FIELD(scanRange);
COPY_SCALAR_FIELD(tableName);
COPY_SCALAR_FIELD(showRewrite);
return (SNode*)pDst;
}
......
......@@ -764,6 +764,8 @@ static int32_t jsonToEpSet(const SJson* pJson, void* pObj) {
}
static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet";
static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite";
static const char* jkSysTableScanPhysiPlanAccountId = "AccountId";
static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) {
const SSystemTableScanPhysiNode* pNode = (const SSystemTableScanPhysiNode*)pObj;
......@@ -772,6 +774,12 @@ static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, epSetToJson, &pNode->mgmtEpSet);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSysTableScanPhysiPlanShowRewrite, pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId);
}
return code;
}
......@@ -783,6 +791,12 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, jsonToEpSet, &pNode->mgmtEpSet);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSysTableScanPhysiPlanShowRewrite, &pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId);
}
return code;
}
......
......@@ -84,12 +84,18 @@ static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName
return pName;
}
static int32_t collectUseDatabase(const char* pFullDbName, SHashObj* pDbs) {
static int32_t collectUseDatabaseImpl(const char* pFullDbName, SHashObj* pDbs) {
SFullDatabaseName name = {0};
strcpy(name.fullDbName, pFullDbName);
return taosHashPut(pDbs, pFullDbName, strlen(pFullDbName), &name, sizeof(SFullDatabaseName));
}
static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName);
return collectUseDatabaseImpl(dbFName, pDbs);
}
static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
......@@ -98,7 +104,10 @@ static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) {
static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STableMeta** pMeta) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseTable(pName, pCxt->pTables);
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(pName, pCxt->pTables);
}
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pMeta);
}
......@@ -117,7 +126,10 @@ static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const
static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseTable(pName, pCxt->pTables);
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(pName, pCxt->pTables);
}
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableDistVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pVgInfo);
}
......@@ -131,7 +143,7 @@ static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArr
SParseContext* pParCxt = pCxt->pParseCxt;
char fullDbName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pName, fullDbName);
int32_t code = collectUseDatabase(fullDbName, pCxt->pDbs);
int32_t code = collectUseDatabaseImpl(fullDbName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetDBVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, fullDbName, pVgInfo);
}
......@@ -151,7 +163,10 @@ static int32_t getDBVgInfo(STranslateContext* pCxt, const char* pDbName, SArray*
static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pName, SVgroupInfo* pInfo) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseTable(pName, pCxt->pTables);
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(pName, pCxt->pTables);
}
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableHashVgroup(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pInfo);
}
......@@ -170,7 +185,7 @@ static int32_t getTableHashVgroup(STranslateContext* pCxt, const char* pDbName,
static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabase(pDbFName, pCxt->pDbs);
int32_t code = collectUseDatabaseImpl(pDbFName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetDBVgVersion(pParCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum);
}
......@@ -388,7 +403,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
}
varDataSetLen(pVal->datum.p, pVal->node.resType.bytes);
strcpy(varDataVal(pVal->datum.p), pVal->literal);
strncpy(varDataVal(pVal->datum.p), pVal->literal, pVal->node.resType.bytes);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
......@@ -599,9 +614,9 @@ static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) {
static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
// todo release
// if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
// return TSDB_CODE_SUCCESS;
// }
if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SArray* vgroupList = NULL;
......@@ -613,9 +628,9 @@ static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRea
if (TSDB_CODE_SUCCESS == code) {
// todo remove
if (NULL != vgroupList && taosArrayGetSize(vgroupList) > 0 && 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
taosArrayPopTailBatch(vgroupList, taosArrayGetSize(vgroupList) - 1);
}
//if (NULL != vgroupList && taosArrayGetSize(vgroupList) > 0 && 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
// taosArrayPopTailBatch(vgroupList, taosArrayGetSize(vgroupList) - 1);
//}
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
}
......@@ -1947,6 +1962,7 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) {
code = createShowCondition((SShowStmt*)pQuery->pRoot, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;
}
......
......@@ -161,6 +161,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan->tableName.acctId = pCxt->pPlanCxt->acctId;
strcpy(pScan->tableName.dbname, pRealTable->table.dbName);
strcpy(pScan->tableName.tname, pRealTable->table.tableName);
pScan->showRewrite = pCxt->pPlanCxt->showRewrite;
// set columns to scan
SNodeList* pCols = NULL;
......
......@@ -331,15 +331,14 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
return TSDB_CODE_OUT_OF_MEMORY;
}
pScan->showRewrite = pScanLogicNode->showRewrite;
pScan->accountId = pCxt->pPlanCxt->acctId;
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
} else {
for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) {
SQueryNodeAddr addr;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr);
taosArrayPush(pCxt->pExecNodeList, &addr);
}
SQueryNodeAddr addr = { .nodeId = MND_VGID, .epSet = pCxt->pPlanCxt->mgmtEpSet };
taosArrayPush(pCxt->pExecNodeList, &addr);
}
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
......
......@@ -140,7 +140,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
return 0;
}
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *rpcCtx) {
char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
if (NULL == pMsg) {
qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
......@@ -154,6 +154,7 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp
.contLen = pInfo->msgInfo.len,
.ahandle = (void*)pInfo,
.handle = pInfo->msgInfo.handle,
.persistHandle = persistHandle,
.code = 0};
if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH ||
pInfo->msgType == TDMT_VND_QUERY_CONTINUE) {
......@@ -162,10 +163,14 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp
assert(pInfo->fp != NULL);
rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId);
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
return TSDB_CODE_SUCCESS;
}
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
}
char *jobTaskStatusStr(int32_t status) {
switch (status) {
case JOB_TASK_STATUS_NULL:
......
......@@ -69,18 +69,24 @@ enum {
typedef struct SQWDebug {
bool lockEnable;
bool statusEnable;
bool dumpEnable;
} SQWDebug;
typedef struct SQWConnInfo {
void *handle;
void *ahandle;
} SQWConnInfo;
typedef struct SQWMsg {
void *node;
char *msg;
int32_t msgLen;
void *connection;
void *node;
char *msg;
int32_t msgLen;
SQWConnInfo connInfo;
} SQWMsg;
typedef struct SQWHbInfo {
SSchedulerHbRsp rsp;
void *connection;
SQWConnInfo connInfo;
} SQWHbInfo;
typedef struct SQWPhaseInput {
......@@ -101,10 +107,6 @@ typedef struct SQWTaskCtx {
SRWLatch lock;
int8_t phase;
int8_t taskType;
void *readyConnection;
void *dropConnection;
void *cancelConnection;
bool emptyRes;
bool queryFetched;
......@@ -113,6 +115,7 @@ typedef struct SQWTaskCtx {
bool queryInQueue;
int32_t rspCode;
SQWConnInfo connInfo;
int8_t events[QW_EVENT_MAX];
qTaskInfo_t taskHandle;
......@@ -120,11 +123,12 @@ typedef struct SQWTaskCtx {
} SQWTaskCtx;
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
uint64_t hbSeqId;
void *hbConnection;
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
int32_t lastAccessTs; // timestamp in second
SRWLatch hbConnLock;
SQWConnInfo hbConnInfo;
SQueryNodeEpId hbEpId;
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus;
// Qnode/Vnode level task management
......@@ -172,12 +176,16 @@ typedef struct SQWorkerMgmt {
#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DUMP(param, ...) do { if (gQWDebug.dumpEnable) { qDebug("QW:%p " param, mgmt, __VA_ARGS__); } } while (0)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) qDebugL("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
......@@ -223,8 +231,6 @@ typedef struct SQWorkerMgmt {
} \
} while (0)
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code);
#ifdef __cplusplus
}
#endif
......
......@@ -30,17 +30,18 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code);
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection);
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code);
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code);
void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
......
......@@ -9,7 +9,7 @@
#include "tname.h"
#include "dataSinkMgt.h"
SQWDebug gQWDebug = {.statusEnable = true};
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) {
......@@ -103,6 +103,36 @@ _return:
QW_RET(code);
}
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {
}
void qwDbgDumpMgmtInfo(SQWorkerMgmt *mgmt) {
if (!gQWDebug.dumpEnable) {
return;
}
QW_LOCK(QW_READ, &mgmt->schLock);
QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));
void *key = NULL;
size_t keyLen = 0;
int32_t i = 0;
SQWSchStatus *sch = NULL;
void *pIter = taosHashIterate(mgmt->schHash, NULL);
while (pIter) {
sch = (SQWSchStatus *)pIter;
qwDbgDumpSchInfo(sch, i);
++i;
pIter = taosHashIterate(mgmt->schHash, pIter);
}
QW_UNLOCK(QW_READ, &mgmt->schLock);
QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));
}
char *qwPhaseStr(int32_t phase) {
switch (phase) {
......@@ -166,7 +196,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
}
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) {
SQWSchStatus newSch = {0};
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == newSch.tasksHash) {
......@@ -200,7 +230,7 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType,
QW_UNLOCK(rwType, &mgmt->schLock);
if (QW_NOT_EXIST_ADD == nOpt) {
QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType, sch));
QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
nOpt = QW_NOT_EXIST_RET_ERR;
......@@ -402,6 +432,9 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
rpcReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER);
ctx->connInfo.handle = NULL;
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
if (ctx->sinkHandle) {
......@@ -516,7 +549,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
while (true) {
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
code = qExecTask(*taskHandle, &pRes, &useconds);
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
......@@ -577,6 +610,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
int32_t taskNum = 0;
hbInfo->connInfo = sch->hbConnInfo;
hbInfo->rsp.epId = sch->hbEpId;
QW_LOCK(QW_READ, &sch->tasksLock);
taskNum = taosHashGetSize(sch->tasksHash);
......@@ -588,9 +624,6 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbI
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
hbInfo->connection = sch->hbConnection;
hbInfo->rsp.seqId = -1;
void *key = NULL;
size_t keyLen = 0;
int32_t i = 0;
......@@ -694,8 +727,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
void *dropConnection = NULL;
void *cancelConnection = NULL;
SQWConnInfo *dropConnection = NULL;
SQWConnInfo *cancelConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
......@@ -727,9 +760,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
dropConnection = &ctx->connInfo;
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = NULL;
qwBuildAndSendDropRsp(&ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
dropConnection = ctx->dropConnection;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
break;
}
......@@ -761,9 +798,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
dropConnection = &ctx->connInfo;
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = NULL;
dropConnection = ctx->dropConnection;
qwBuildAndSendDropRsp(&ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
......@@ -789,13 +830,13 @@ _return:
}
if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, code);
QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code));
qwBuildAndSendDropRsp(dropConnection, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
}
if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code));
qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
}
QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
......@@ -807,9 +848,8 @@ _return:
int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
void *readyConnection = NULL;
void *dropConnection = NULL;
void *cancelConnection = NULL;
SQWConnInfo connInfo = {0};
SQWConnInfo *readyConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
......@@ -826,11 +866,18 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) {
ctx->emptyRes = true;
}
#if 0
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
readyConnection = ctx->readyConnection;
readyConnection = &ctx->connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
}
#else
connInfo.handle = ctx->connInfo.handle;
readyConnection = &connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
......@@ -838,10 +885,11 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
qwBuildAndSendDropRsp(&ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = ctx->dropConnection;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
......@@ -870,18 +918,8 @@ _return:
}
if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, code);
QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code));
}
if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, code);
QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code));
}
if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code));
qwBuildAndSendReadyRsp(readyConnection, code);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
}
if (code) {
......@@ -893,23 +931,26 @@ _return:
QW_RET(code);
}
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
int32_t code = 0;
bool queryRsped = false;
bool needStop = false;
struct SSubplan *plan = NULL;
SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwRegisterBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_8(&ctx->taskType, taskType);
atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle);
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
......@@ -927,8 +968,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code));
QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
queryRsped = true;
......@@ -945,11 +986,11 @@ _return:
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (!queryRsped) {
qwBuildAndSendQueryRsp(qwMsg->connection, code);
QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
}
QW_RET(code);
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
......@@ -968,8 +1009,9 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (ctx->phase == QW_PHASE_PRE_QUERY) {
ctx->connInfo.handle == qwMsg->connInfo.handle;
ctx->connInfo.ahandle = qwMsg->connInfo.ahandle;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
ctx->readyConnection = qwMsg->connection;
needRsp = false;
QW_TASK_DLOG_E("ready msg will not rsp now");
goto _return;
......@@ -1007,11 +1049,11 @@ _return:
}
if (needRsp) {
qwBuildAndSendReadyRsp(qwMsg->connection, code);
QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code));
qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
}
QW_RET(code);
QW_RET(TSDB_CODE_SUCCESS);
}
......@@ -1048,10 +1090,11 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
atomic_store_8((int8_t*)&ctx->queryEnd, qComplete);
qwMsg->connInfo = ctx->connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} else {
atomic_store_8((int8_t*)&ctx->queryContinue, 1);
}
......@@ -1067,8 +1110,10 @@ _return:
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwFreeFetchRsp(rsp);
rsp = NULL;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code);
QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code));
qwMsg->connInfo = ctx->connInfo;
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0);
}
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -1082,7 +1127,9 @@ _return:
} while (true);
input.code = code;
QW_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL));
qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
QW_RET(TSDB_CODE_SUCCESS);
}
......@@ -1097,11 +1144,14 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
SOutputData sOutput = {0};
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) {
atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle);
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
} else {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
......@@ -1123,7 +1173,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
atomic_store_8((int8_t*)&ctx->queryInQueue, 1);
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection));
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
}
}
......@@ -1143,17 +1193,17 @@ _return:
}
if (code || rsp) {
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
}
QW_RET(code);
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0;
bool needRsp = false;
bool rsped = false;
SQWTaskCtx *ctx = NULL;
bool locked = false;
......@@ -1174,14 +1224,18 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
} else if (ctx->phase > 0) {
qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
needRsp = true;
rsped = true;
} else {
// task not started
}
if (!needRsp) {
ctx->dropConnection = qwMsg->connection;
if (!rsped) {
ctx->connInfo.handle = qwMsg->connInfo.handle;
ctx->connInfo.ahandle = qwMsg->connInfo.ahandle;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
......@@ -1204,13 +1258,12 @@ _return:
qwReleaseTaskCtx(mgmt, ctx);
}
if (TSDB_CODE_SUCCESS != code || needRsp) {
QW_ERR_RET(qwBuildAndSendDropRsp(qwMsg->connection, code));
QW_TASK_DLOG("drop msg rsped, code:%x", code);
if (TSDB_CODE_SUCCESS != code) {
qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
}
QW_RET(code);
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
......@@ -1218,38 +1271,46 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
SSchedulerHbRsp rsp = {0};
SQWSchStatus *sch = NULL;
uint64_t seqId = 0;
void *origHandle = NULL;
memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
atomic_store_ptr(&sch->hbConnection, qwMsg->connection);
++sch->hbSeqId;
QW_LOCK(QW_WRITE, &sch->hbConnLock);
rsp.seqId = sch->hbSeqId;
QW_DLOG("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, connection:%p",
sch->hbSeqId, req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connection);
if (sch->hbConnInfo.handle) {
rpcReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
}
memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p",
req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
qwReleaseScheduler(QW_READ, mgmt);
_return:
qwBuildAndSendHbRsp(qwMsg->connection, &rsp, code);
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
QW_RET(code);
QW_RET(TSDB_CODE_SUCCESS);
}
void qwProcessHbTimerEvent(void *param, void *tmrId) {
return;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param;
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
SQWHbInfo *rspList = NULL;
int32_t code = 0;
qwDbgDumpMgmtInfo(mgmt);
QW_LOCK(QW_READ, &mgmt->schLock);
int32_t schNum = taosHashGetSize(mgmt->schHash);
......@@ -1288,8 +1349,8 @@ _return:
QW_UNLOCK(QW_READ, &mgmt->schLock);
for (int32_t j = 0; j < i; ++j) {
QW_DLOG("hb on connection %p, taskNum:%d", rspList[j].connection, (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
qwBuildAndSendHbRsp(rspList[j].connection, &rspList[j].rsp, code);
qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
tFreeSSchedulerHbRsp(&rspList[j].rsp);
}
......
......@@ -26,6 +26,8 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
return TSDB_CODE_SUCCESS;
}
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
......@@ -44,8 +46,7 @@ void qwFreeFetchRsp(void *msg) {
}
}
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
SQueryTableRsp rsp = {.code = code};
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
......@@ -54,8 +55,8 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = msg,
.contLen = contLen,
.code = code,
......@@ -66,15 +67,14 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = NULL,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -85,15 +85,15 @@ int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
void *pRsp = rpcMallocCont(contLen);
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
......@@ -104,9 +104,7 @@ int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *pStatus, int32_t cod
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
......@@ -115,8 +113,8 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_FETCH_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
......@@ -127,14 +125,14 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_CANCEL_TASK_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -144,15 +142,14 @@ int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_DROP_TASK_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -234,8 +231,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
......@@ -248,12 +244,12 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) {
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
};
int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
......@@ -268,6 +264,35 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) {
return TSDB_CODE_SUCCESS;
}
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = htonl(mgmt->nodeId);
req->sId = htobe64(sId);
req->queryId = htobe64(qId);
req->taskId = htobe64(tId);
req->refId = htobe64(rId);
SRpcMsg pMsg = {
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.msgType = TDMT_VND_DROP_TASK,
.pCont = req,
.contLen = sizeof(STaskDropReq),
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
};
rpcRegisterBrokenLinkArg(&pMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......@@ -294,10 +319,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
char* sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, sql:%s", node, sql);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
taosMemoryFreeClear(sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
......@@ -326,9 +353,11 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processCQuery start, node:%p", node);
QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));
......@@ -358,9 +387,11 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processReady start, node:%p", node);
QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg));
......@@ -418,9 +449,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processFetch start, node:%p", node);
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
......@@ -439,6 +472,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
......@@ -451,11 +485,21 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
//QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return:
QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg, code));
QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
return TSDB_CODE_SUCCESS;
}
......@@ -484,9 +528,15 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
}
QW_SCH_TASK_DLOG("processDrop start, node:%p", node);
QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
......@@ -516,9 +566,11 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
uint64_t sId = req.sId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_DLOG("processHb start, node:%p", node);
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));
......@@ -535,7 +587,7 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
SVShowTablesReq *pReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code));
QW_RET(qwBuildAndSendShowRsp(pMsg, code));
}
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
......@@ -544,7 +596,7 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg)
}
SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
}
......@@ -887,8 +887,8 @@ TEST(seqTest, normalCase) {
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
ASSERT_EQ(code, 0);
//code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
//ASSERT_EQ(code, 0);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
ASSERT_EQ(code, 0);
......@@ -985,12 +985,12 @@ TEST(seqTest, randCase) {
qwtBuildQueryReqMsg(&queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
} else if (r >= maxr/5 && r < maxr * 2/5) {
printf("Ready,%d\n", t++);
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
if (qwtTestEnableSleep) {
taosUsleep(1);
}
//printf("Ready,%d\n", t++);
//qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
//code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
//if (qwtTestEnableSleep) {
// taosUsleep(1);
//}
} else if (r >= maxr * 2/5 && r < maxr* 3/5) {
printf("Fetch,%d\n", t++);
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
......@@ -1054,7 +1054,7 @@ TEST(seqTest, multithreadRand) {
TdThread t1,t2,t3,t4,t5,t6;
taosThreadCreate(&(t1), &thattr, queryThread, mgmt);
taosThreadCreate(&(t2), &thattr, readyThread, NULL);
//taosThreadCreate(&(t2), &thattr, readyThread, NULL);
taosThreadCreate(&(t3), &thattr, fetchThread, NULL);
taosThreadCreate(&(t4), &thattr, dropThread, NULL);
taosThreadCreate(&(t5), &thattr, statusThread, NULL);
......
......@@ -25,6 +25,7 @@ extern "C" {
#include "planner.h"
#include "scheduler.h"
#include "thash.h"
#include "trpc.h"
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
......@@ -44,7 +45,7 @@ typedef struct SSchTrans {
typedef struct SSchHbTrans {
SRWLatch lock;
uint64_t seqId;
SRpcCtx rpcCtx;
SSchTrans trans;
} SSchHbTrans;
......@@ -76,12 +77,23 @@ typedef struct SSchedulerMgmt {
SHashObj *hbConnections;
} SSchedulerMgmt;
typedef struct SSchCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
void *transport;
} SSchCallbackParam;
typedef struct SSchCallbackParamHeader {
bool isHbParam;
} SSchCallbackParamHeader;
typedef struct SSchTaskCallbackParam {
SSchCallbackParamHeader head;
uint64_t queryId;
int64_t refId;
uint64_t taskId;
void *transport;
} SSchTaskCallbackParam;
typedef struct SSchHbCallbackParam {
SSchCallbackParamHeader head;
SQueryNodeEpId nodeEpId;
void *transport;
} SSchHbCallbackParam;
typedef struct SSchFlowControl {
SRWLatch lock;
......@@ -91,6 +103,11 @@ typedef struct SSchFlowControl {
SArray *taskList; // Element is SSchTask*
} SSchFlowControl;
typedef struct SSchNodeInfo {
SQueryNodeAddr addr;
void *handle;
} SSchNodeInfo;
typedef struct SSchLevel {
int32_t level;
int8_t status;
......@@ -116,7 +133,7 @@ typedef struct SSchTask {
SQueryNodeAddr succeedAddr; // task executed success node address
int8_t candidateIdx; // current try condidation index
SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
SArray *execAddrs; // all tried node for current task, element is SQueryNodeAddr
SArray *execNodes; // all tried node for current task, element is SSchNodeInfo
SQueryProfileSummary summary; // task execution summary
int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
......@@ -178,6 +195,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))
#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL)
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
......@@ -205,6 +224,8 @@ extern SSchedulerMgmt schMgmt;
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...) \
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
......@@ -228,6 +249,8 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schFetchFromRemote(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId);
int32_t schCloneSMsgSendInfo(void *src, void **dst);
#ifdef __cplusplus
......
......@@ -387,7 +387,7 @@ void *schtCreateFetchRspThread(void *param) {
void *schtFetchRspThread(void *aa) {
SDataBuf dataBuf = {0};
SSchCallbackParam* param = NULL;
SSchTaskCallbackParam* param = NULL;
while (!schtTestStop) {
if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) {
......@@ -396,7 +396,7 @@ void *schtFetchRspThread(void *aa) {
taosUsleep(1);
param = (SSchCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
param->queryId = schtQueryId;
param->taskId = schtFetchTaskId;
......@@ -449,7 +449,7 @@ void* schtRunJobThread(void *aa) {
schtSetAsyncSendMsgToServer();
SSchJob *pJob = NULL;
SSchCallbackParam *param = NULL;
SSchTaskCallbackParam *param = NULL;
SHashObj *execTasks = NULL;
SDataBuf dataBuf = {0};
uint32_t jobFinished = 0;
......@@ -484,7 +484,7 @@ void* schtRunJobThread(void *aa) {
pIter = taosHashIterate(pJob->execTasks, pIter);
}
param = (SSchCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
......@@ -504,7 +504,7 @@ void* schtRunJobThread(void *aa) {
}
param = (SSchCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
......@@ -524,7 +524,7 @@ void* schtRunJobThread(void *aa) {
}
param = (SSchCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
......@@ -544,7 +544,7 @@ void* schtRunJobThread(void *aa) {
}
param = (SSchCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
......@@ -713,6 +713,116 @@ TEST(queryTest, normalCase) {
schedulerDestroy();
}
TEST(queryTest, readyFirstCase) {
void *mockPointer = (void *)0x1;
char *clusterId = "cluster1";
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
int64_t job = 0;
SQueryPlan dag;
memset(&dag, 0, sizeof(dag));
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SEp qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
int32_t code = schedulerInit(NULL);
ASSERT_EQ(code, 0);
schtBuildQueryDag(&dag);
schtSetPlanToString();
schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
ASSERT_EQ(code, 0);
SSchJob *pJob = schAcquireJob(job);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
printf("code:%d", code);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
TdThread thread1;
taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL;
code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
ASSERT_EQ(pRsp->completed, 1);
ASSERT_EQ(pRsp->numOfRows, 10);
taosMemoryFreeClear(data);
data = NULL;
code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL);
schReleaseJob(job);
schedulerFreeJob(job);
schtFreeQueryDag(&dag);
schedulerDestroy();
}
TEST(queryTest, flowCtrlCase) {
void *mockPointer = (void *)0x1;
char *clusterId = "cluster1";
......
......@@ -158,7 +158,8 @@ typedef struct {
char secured : 2;
char spi : 2;
uint32_t code; // del later
uint64_t ahandle; // ahandle assigned by client
uint32_t code; // del later
uint32_t msgType;
int32_t msgLen;
uint8_t content[0]; // message body starts from here
......@@ -182,7 +183,7 @@ typedef struct {
#pragma pack(pop)
typedef enum { Normal, Quit, Release, Register } STransMsgType;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
......@@ -277,6 +278,7 @@ void transCtxCleanup(STransCtx* ctx);
void transCtxClear(STransCtx* ctx);
void transCtxMerge(STransCtx* dst, STransCtx* src);
void* transCtxDumpVal(STransCtx* ctx, int32_t key);
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// queue sending msgs
typedef struct {
......@@ -295,20 +297,25 @@ void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg));
* if queue'size > 1, return false; else return true
*/
bool transQueuePush(STransQueue* queue, void* arg);
/*
* the size of queue
*/
int32_t transQueueSize(STransQueue* queue);
/*
* pop head from queue
*/
void* transQueuePop(STransQueue* queue);
/*
* get head from queue
* get ith from queue
*/
void* transQueueGet(STransQueue* queue);
void* transQueueGet(STransQueue* queue, int i);
/*
* rm ith from queue
*/
void* transQueueRm(STransQueue* queue, int i);
/*
* queue empty or not
*/
bool transQueueEmpty(STransQueue* queue);
/*
* clear queue
......
......@@ -25,12 +25,11 @@ typedef struct SCliConn {
void* hostThrd;
SConnBuffer readBuf;
void* data;
// SArray* cliMsgs;
STransQueue cliMsgs;
queue conn;
uint64_t expireTime;
int hThrdIdx;
STransCtx ctx;
STransQueue cliMsgs;
queue conn;
uint64_t expireTime;
int hThrdIdx;
STransCtx ctx;
bool broken; // link broken or not
ConnStatus status; //
......@@ -54,6 +53,7 @@ typedef struct SCliMsg {
queue q;
uint64_t st;
STransMsgType type;
int sent; //(0: no send, 1: alread sent)
} SCliMsg;
typedef struct SCliThrdObj {
......@@ -136,6 +136,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
......@@ -147,10 +149,40 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
SCliThrdObj* thrd = conn->hostThrd; \
addConnToPool(thrd->pool, conn); \
} \
destroyCmsg(pMsg); \
return; \
} \
} while (0)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
do { \
int i = 0, sz = transQueueSize(&conn->cliMsgs); \
for (; i < sz; i++) { \
pMsg = transQueueGet(&conn->cliMsgs, i); \
if (pMsg != NULL && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
break; \
} \
} \
if (i == sz) { \
pMsg = NULL; \
} else { \
pMsg = transQueueRm(&conn->cliMsgs, i); \
} \
} while (0)
#define CONN_GET_NEXT_SENDMSG(conn) \
do { \
int i = 0; \
do { \
pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
if (pCliMsg && 0 == pCliMsg->sent) { \
break; \
} \
} while (pCliMsg != NULL); \
if (pCliMsg == NULL) { \
goto _RETURN; \
} \
} while (0)
#define CONN_HANDLE_THREAD_QUIT(thrd) \
do { \
if (thrd->quit) { \
......@@ -173,8 +205,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
transRefCliHandle(conn); \
} \
} while (0)
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->status == ConnNormal && T_REF_VAL_GET(conn) == 1)
#define CONN_NO_PERSIST_BY_APP(conn) \
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
......@@ -183,10 +217,13 @@ static void* cliWorkThread(void* arg);
bool cliMaySendCachedMsg(SCliConn* conn) {
if (!transQueueEmpty(&conn->cliMsgs)) {
SCliMsg* pCliMsg = NULL;
CONN_GET_NEXT_SENDMSG(conn);
cliSend(conn);
return true;
}
return false;
_RETURN:
return false;
}
void cliHandleResp(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd;
......@@ -203,15 +240,38 @@ void cliHandleResp(SCliConn* conn) {
transMsg.msgType = pHead->msgType;
transMsg.ahandle = NULL;
SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = NULL;
CONN_SHOULD_RELEASE(conn, pHead);
SCliMsg* pMsg = transQueuePop(&conn->cliMsgs);
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
if (CONN_NO_PERSIST_BY_APP(conn)) {
pMsg = transQueuePop(&conn->cliMsgs);
pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
}
tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle);
} else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle);
}
} else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
uint64_t ahandle = (uint64_t)pHead->ahandle;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
if (pMsg == NULL) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
tDebug("cli conn %p construct ahandle %p by %d, persist: 1", conn, transMsg.ahandle, transMsg.msgType);
if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle);
}
} else {
pCtx = pMsg ? pMsg->ctx : NULL;
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("cli conn %p get ahandle %p, persist: 1", conn, transMsg.ahandle);
}
}
// buf's mem alread translated to transMsg.pCont
transClearBuffer(&conn->readBuf);
......@@ -232,6 +292,11 @@ void cliHandleResp(SCliConn* conn) {
// transUnrefCliHandle(conn);
return;
}
if (CONN_RELEASE_BY_SERVER(conn) && transMsg.ahandle == NULL) {
tTrace("except, server continue send while cli ignore it");
// transUnrefCliHandle(conn);
return;
}
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
......@@ -256,41 +321,54 @@ void cliHandleResp(SCliConn* conn) {
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
_RETURN:
return;
}
void cliHandleExcept(SCliConn* pConn) {
if (transQueueEmpty(&pConn->cliMsgs)) {
if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) {
if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
tTrace("%s cli conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
transUnrefCliHandle(pConn);
return;
}
}
SCliThrdObj* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
bool once = false;
do {
SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
if (pMsg == NULL && once) {
break;
}
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
STransMsg transMsg = {0};
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.ahandle = NULL;
transMsg.handle = pConn;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle,
TMSG_INFO(transMsg.msgType));
if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
transMsg.ahandle);
}
} else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
}
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
tTrace("%s cli conn %p handle except", pTransInst->label, pConn);
if (transMsg.ahandle == NULL) {
once = true;
continue;
}
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else {
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn);
tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn);
memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg));
tsem_post(pCtx->pSem);
}
......@@ -364,8 +442,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
}
queue* h = QUEUE_HEAD(&plist->conn);
QUEUE_REMOVE(h);
SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
conn->status = ConnNormal;
QUEUE_INIT(&conn->conn);
return conn;
}
......@@ -377,7 +455,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
transCtxCleanup(&conn->ctx);
transQueueClear(&conn->cliMsgs);
conn->status = ConnNormal;
conn->status = ConnInPool;
char key[128] = {0};
tstrncpy(key, conn->ip, strlen(conn->ip));
......@@ -466,7 +544,7 @@ static void cliDestroy(uv_handle_t* handle) {
static bool cliHandleNoResp(SCliConn* conn) {
bool res = false;
if (!transQueueEmpty(&conn->cliMsgs)) {
SCliMsg* pMsg = transQueueGet(&conn->cliMsgs);
SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0);
if (REQUEST_NO_RESP(&pMsg->msg)) {
transQueuePop(&conn->cliMsgs);
// taosArrayRemove(msgs, 0);
......@@ -504,7 +582,11 @@ void cliSend(SCliConn* pConn) {
// assert(taosArrayGetSize(pConn->cliMsgs) > 0);
assert(!transQueueEmpty(&pConn->cliMsgs));
SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs);
SCliMsg* pCliMsg = NULL;
CONN_GET_NEXT_SENDMSG(pConn);
pCliMsg->sent = 1;
STransConnCtx* pCtx = pCliMsg->ctx;
SCliThrdObj* pThrd = pConn->hostThrd;
......@@ -516,7 +598,9 @@ void cliSend(SCliConn* pConn) {
pMsg->contLen = 0;
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen);
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
int msgLen = transMsgLenFromCont(pMsg->contLen);
if (!pConn->secured) {
char* buf = taosMemoryCalloc(1, msgLen + sizeof(STransUserMsg));
......@@ -555,6 +639,8 @@ void cliSend(SCliConn* pConn) {
pConn->writeReq.data = pConn;
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return;
_RETURN:
return;
}
......@@ -643,6 +729,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
cliSend(conn);
} else {
conn = cliCreateConn(pThrd);
transCtxMerge(&conn->ctx, &pCtx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
conn->hThrdIdx = pCtx->hThrdIdx;
......
......@@ -238,12 +238,15 @@ void transCtxCleanup(STransCtx* ctx) {
iter->freeFunc(iter->val);
iter = taosHashIterate(ctx->args, iter);
}
taosHashCleanup(ctx->args);
ctx->args = NULL;
}
void transCtxMerge(STransCtx* dst, STransCtx* src) {
if (dst->args == NULL) {
dst->args = src->args;
dst->brokenVal = src->brokenVal;
src->args = NULL;
return;
}
......@@ -271,9 +274,20 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
if (cVal == NULL) {
return NULL;
}
char* ret = taosMemoryCalloc(1, cVal->len);
memcpy(ret, (char*)cVal->val, cVal->len);
return (void*)ret;
void* ret = NULL;
(*cVal->clone)(cVal->val, &ret);
return ret;
}
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
void* ret = NULL;
if (ctx->brokenVal.clone == NULL) {
return ret;
}
(*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
*msgType = ctx->brokenVal.msgType;
return ret;
}
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
......@@ -281,6 +295,9 @@ void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
queue->freeFunc = freeFunc;
}
bool transQueuePush(STransQueue* queue, void* arg) {
if (queue->q == NULL) {
return true;
}
taosArrayPush(queue->q, &arg);
if (taosArrayGetSize(queue->q) > 1) {
return false;
......@@ -288,23 +305,47 @@ bool transQueuePush(STransQueue* queue, void* arg) {
return true;
}
void* transQueuePop(STransQueue* queue) {
if (taosArrayGetSize(queue->q) == 0) {
if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, 0);
taosArrayRemove(queue->q, 0);
return ptr;
}
int32_t transQueueSize(STransQueue* queue) {
if (queue->q == NULL) {
return 0;
}
return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL;
}
if (i >= taosArrayGetSize(queue->q)) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, i);
return ptr;
}
void* transQueueGet(STransQueue* queue) {
if (taosArrayGetSize(queue->q) == 0) {
void* transQueueRm(STransQueue* queue, int i) {
if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, 0);
if (i >= taosArrayGetSize(queue->q)) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, i);
taosArrayRemove(queue->q, i);
return ptr;
}
bool transQueueEmpty(STransQueue* queue) {
//
if (queue->q == NULL) {
return true;
}
return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
......
......@@ -93,25 +93,25 @@ typedef struct SServerObj {
static const char* notify = "a";
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.handle = (void*)conn, .code = 0}; \
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
......@@ -190,7 +190,7 @@ static void uvHandleReq(SSrvConn* pConn) {
transMsg.pCont = pHead->content;
transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code;
transMsg.ahandle = NULL;
transMsg.ahandle = (void*)pHead->ahandle;
transMsg.handle = NULL;
transClearBuffer(&pConn->readBuf);
......@@ -199,6 +199,7 @@ static void uvHandleReq(SSrvConn* pConn) {
if (pHead->persist == 1) {
pConn->status = ConnAcquire;
transRefSrvHandle(pConn);
tDebug("server conn %p acquired by server app", pConn);
}
}
if (pConn->status == ConnNormal && pHead->noResp == 0) {
......@@ -279,7 +280,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
destroySmsg(msg);
// send second data, just use for push
if (!transQueueEmpty(&conn->srvMsgs)) {
msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs);
msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
if (msg->type == Register && conn->status == ConnAcquire) {
conn->regArg.notifyCount = 0;
conn->regArg.init = 1;
......@@ -291,6 +292,11 @@ void uvOnSendCb(uv_write_t* req, int status) {
}
transQueuePop(&conn->srvMsgs);
taosMemoryFree(msg);
msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
if (msg != NULL) {
uvStartSendRespInternal(msg);
}
} else {
uvStartSendRespInternal(msg);
}
......@@ -321,6 +327,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
pMsg->contLen = 0;
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = (uint64_t)pMsg->ahandle;
// pHead->secured = pMsg->code == 0 ? 1 : 0; //
if (!pConn->secured) {
......@@ -553,7 +560,7 @@ static bool addHandleToWorkloop(void* arg) {
// conn set
QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true;
}
......@@ -617,8 +624,6 @@ static void destroyConn(SSrvConn* conn, bool clear) {
return;
}
transDestroyBuffer(&conn->readBuf);
transQueueDestroy(&conn->srvMsgs);
if (clear) {
tTrace("server conn %p to be destroyed", conn);
uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
......@@ -634,11 +639,13 @@ static void uvDestroyConn(uv_handle_t* handle) {
tDebug("server conn %p destroy", conn);
uv_timer_stop(&conn->pTimer);
transQueueDestroy(&conn->srvMsgs);
QUEUE_REMOVE(&conn->queue);
taosMemoryFree(conn->pTcp);
// taosMemoryFree(conn);
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
tTrace("work thread quit");
uv_loop_close(thrd->loop);
uv_stop(thrd->loop);
}
......@@ -700,12 +707,12 @@ End:
return NULL;
}
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
thrd->quit = true;
if (QUEUE_IS_EMPTY(&thrd->conn)) {
uv_loop_close(thrd->loop);
uv_stop(thrd->loop);
} else {
destroyAllConn(thrd);
thrd->quit = true;
}
taosMemoryFree(msg);
}
......@@ -725,7 +732,7 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
}
void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
// send msg to client
tDebug("server conn %p start to send resp", msg->pConn);
tDebug("server conn %p start to send resp (2/2)", msg->pConn);
uvStartSendResp(msg);
}
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
......@@ -735,9 +742,11 @@ void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
if (!transQueuePush(&conn->srvMsgs, msg)) {
return;
}
transQueuePop(&conn->srvMsgs);
conn->regArg.notifyCount = 0;
conn->regArg.init = 1;
conn->regArg.msg = msg->msg;
tDebug("server conn %p register brokenlink callback succ", conn);
if (conn->broken) {
STrans* pTransInst = conn->pTransInst;
......@@ -766,15 +775,16 @@ void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
void transCloseServer(void* arg) {
// impl later
SServerObj* srv = arg;
for (int i = 0; i < srv->numOfThreads; i++) {
sendQuitToWorkThrd(srv->pThreadObj[i]);
destroyWorkThrd(srv->pThreadObj[i]);
}
tDebug("send quit msg to accept thread");
uv_async_send(srv->pAcceptAsync);
taosThreadJoin(srv->thread, NULL);
for (int i = 0; i < srv->numOfThreads; i++) {
sendQuitToWorkThrd(srv->pThreadObj[i]);
destroyWorkThrd(srv->pThreadObj[i]);
}
taosMemoryFree(srv->pThreadObj);
taosMemoryFree(srv->pAcceptAsync);
taosMemoryFree(srv->loop);
......@@ -815,7 +825,7 @@ void transReleaseSrvHandle(void* handle) {
SSrvConn* pConn = handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
STransMsg tmsg = {.handle = handle, .code = 0};
STransMsg tmsg = {.code = 0, .handle = handle, .ahandle = NULL};
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg;
......@@ -831,12 +841,15 @@ void transSendResponse(const STransMsg* pMsg) {
}
SSrvConn* pConn = pMsg->handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
if (pThrd->quit) {
return;
}
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->pConn = pConn;
srvMsg->msg = *pMsg;
srvMsg->type = Normal;
tTrace("server conn %p start to send resp", pConn);
tTrace("server conn %p start to send resp (1/2)", pConn);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
}
void transRegisterMsg(const STransMsg* msg) {
......
......@@ -367,9 +367,10 @@ TEST_F(TransEnv, srvReleaseHandle) {
SRpcMsg resp = {0};
tr->SetSrvContinueSend(processReleaseHandleCb);
// tr->Restart(processReleaseHandleCb);
void *handle = NULL;
void * handle = NULL;
SRpcMsg req = {0};
for (int i = 0; i < 1; i++) {
SRpcMsg req = {0};
memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.persistHandle = 1;
req.msgType = 1;
......@@ -383,8 +384,9 @@ TEST_F(TransEnv, srvReleaseHandle) {
}
TEST_F(TransEnv, cliReleaseHandleExcept) {
SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 3; i++) {
SRpcMsg req = {0};
memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.persistHandle = 1;
req.msgType = 1;
......@@ -403,8 +405,10 @@ TEST_F(TransEnv, cliReleaseHandleExcept) {
}
TEST_F(TransEnv, srvContinueSend) {
tr->SetSrvContinueSend(processContinueSend);
SRpcMsg req = {0}, resp = {0};
for (int i = 0; i < 10; i++) {
SRpcMsg req = {0}, resp = {0};
memset(&req, 0, sizeof(req));
memset(&resp, 0, sizeof(resp));
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
......@@ -417,8 +421,9 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
tr->SetSrvContinueSend(processContinueSend);
// tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) {
SRpcMsg req = {0};
memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.msgType = 1;
req.pCont = rpcMallocCont(10);
......@@ -436,8 +441,9 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
TEST_F(TransEnv, cliPersistHandleExcept) {
tr->SetSrvContinueSend(processContinueSend);
SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) {
SRpcMsg req = {0};
memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.msgType = 1;
req.pCont = rpcMallocCont(10);
......@@ -459,8 +465,9 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) {
TEST_F(TransEnv, queryExcept) {
tr->SetSrvContinueSend(processRegisterFailure);
SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) {
SRpcMsg req = {0};
memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.persistHandle = 1;
req.msgType = 1;
......@@ -477,8 +484,9 @@ TEST_F(TransEnv, queryExcept) {
}
TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) {
SRpcMsg req = {0};
memset(&req, 0, sizeof(req));
req.noResp = 1;
req.msgType = 1;
req.pCont = rpcMallocCont(10);
......
......@@ -156,17 +156,15 @@ TEST_F(TransCtxEnv, mergeTest) {
STransCtx *src = (STransCtx *)taosMemoryCalloc(1, sizeof(STransCtx));
transCtxInit(src);
{
STransCtxVal val1 = {.val = NULL, .len = 0, .freeFunc = taosMemoryFree};
STransCtxVal val1 = {.val = NULL, .freeFunc = taosMemoryFree};
val1.val = taosMemoryMalloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
{
STransCtxVal val1 = {.val = NULL, .len = 0, .freeFunc = taosMemoryFree};
STransCtxVal val1 = {.val = NULL, .freeFunc = taosMemoryFree};
val1.val = taosMemoryMalloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
......@@ -178,17 +176,15 @@ TEST_F(TransCtxEnv, mergeTest) {
STransCtx *src = (STransCtx *)taosMemoryCalloc(1, sizeof(STransCtx));
transCtxInit(src);
{
STransCtxVal val1 = {.val = NULL, .len = 0, .freeFunc = taosMemoryFree};
STransCtxVal val1 = {.val = NULL, .freeFunc = taosMemoryFree};
val1.val = taosMemoryMalloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
{
STransCtxVal val1 = {.val = NULL, .len = 0, .freeFunc = taosMemoryFree};
STransCtxVal val1 = {.val = NULL, .freeFunc = taosMemoryFree};
val1.val = taosMemoryMalloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
......@@ -202,19 +198,17 @@ TEST_F(TransCtxEnv, mergeTest) {
STransCtx *src = (STransCtx *)taosMemoryCalloc(1, sizeof(STransCtx));
transCtxInit(src);
{
STransCtxVal val1 = {.val = NULL, .len = 0, .freeFunc = taosMemoryFree};
STransCtxVal val1 = {.val = NULL, .freeFunc = taosMemoryFree};
val1.val = taosMemoryCalloc(1, 11);
memcpy(val1.val, val.c_str(), val.size());
val1.len = 11;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
{
STransCtxVal val1 = {.val = NULL, .len = 0, .freeFunc = taosMemoryFree};
STransCtxVal val1 = {.val = NULL, .freeFunc = taosMemoryFree};
val1.val = taosMemoryCalloc(1, 11);
memcpy(val1.val, val.c_str(), val.size());
val1.len = 11;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++;
}
......
......@@ -39,11 +39,10 @@ endi
print =============== drop database
sql drop database d1
# todo release
#sql show databases
#if $rows != 1 then
# return -1
#endi
sql show databases
if $rows != 1 then
return -1
endi
print =============== more databases
sql create database d2 vgroups 2
......
......@@ -58,11 +58,10 @@ endi
print =============== step3
sql drop database $db
# todo release
#sql show databases
#if $rows != 1 then
# return -1
#endi
sql show databases
if $rows != 1 then
return -1
endi
print =============== step4
sql_error drop database $db
......@@ -319,4 +318,4 @@ if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -16,17 +16,11 @@ create1:
return -1
endi
# todo remove
sql create database useless_db
sql show dnodes
if $data4_2 != ready then
goto create1
endi
# todo remove
sql drop database useless_db
print ========== stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
......@@ -67,6 +61,7 @@ endi
print ========== stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
sleep 1000
print =============== create database
sql_error drop database d1
......@@ -103,4 +98,4 @@ if $data03 != 0 then
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode2 -s stop -x SIGINT
......@@ -5,9 +5,6 @@ system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect
# todo remove
sql create database useless_db
print =============== show dnodes
sql show dnodes;
if $rows != 1 then
......@@ -83,9 +80,6 @@ if $data02 != master then
return -1
endi
# todo remove
sql drop database useless_db
print =============== create database
sql create database d1 vgroups 4;
sql create database d2;
......@@ -202,4 +196,4 @@ if $data00 != 1 then
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode2 -s stop -x SIGINT
......@@ -6,9 +6,6 @@ system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
# todo remove
sql create database useless_db
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
......@@ -26,9 +23,6 @@ if $data04 != ready then
goto check_dnode_ready
endi
# todo remove
sql drop database useless_db
#root@trd02 /data2/dnode $ tmq_demo --help
#Used to tmq_demo
# -c Configuration directory, default is
......
......@@ -3,9 +3,6 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
# todo remove
sql create database useless_db
print =============== show users
sql show users
if $rows != 1 then
......@@ -74,7 +71,4 @@ print $data10 $data11 $data22
print $data20 $data11 $data22
print $data30 $data31 $data32
# todo remove
sql drop database useless_db
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -358,6 +358,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
} else {
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
}
taos_free_result(pSql);
} else {
int num_rows_affacted = taos_affected_rows(pSql);
taos_free_result(pSql);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册