From b26cc9c318b52168e8c9b6f1e7ca3be2ef07a87e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Oct 2022 17:03:17 +0800 Subject: [PATCH] feat(snode) --- include/common/tglobal.h | 4 +- include/dnode/snode/snode.h | 4 +- source/common/src/tglobal.c | 38 +-- source/dnode/mgmt/mgmt_snode/inc/smInt.h | 15 +- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 14 +- source/dnode/mgmt/mgmt_snode/src/smInt.c | 1 + source/dnode/mgmt/mgmt_snode/src/smWorker.c | 128 +++++--- source/dnode/mnode/impl/src/mndScheduler.c | 6 +- source/dnode/snode/inc/sndInt.h | 4 +- source/dnode/snode/src/snode.c | 344 ++++++++++---------- source/dnode/vnode/src/tq/tq.c | 3 - source/dnode/vnode/src/vnd/vnodeSvr.c | 7 +- tests/script/jenkins/basic.txt | 2 +- tests/script/tsim/stream/schedSnode.sim | 36 +- 14 files changed, 322 insertions(+), 284 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 40f73eb06c..cb4426f8a9 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -60,8 +60,8 @@ extern int32_t tsNumOfVnodeSyncThreads; extern int32_t tsNumOfVnodeRsmaThreads; extern int32_t tsNumOfQnodeQueryThreads; extern int32_t tsNumOfQnodeFetchThreads; -extern int32_t tsNumOfSnodeSharedThreads; -extern int32_t tsNumOfSnodeUniqueThreads; +extern int32_t tsNumOfSnodeStreamThreads; +extern int32_t tsNumOfSnodeWriteThreads; extern int64_t tsRpcQueueMemoryAllowed; // monitor diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 3d0ef2e052..e8c64b07c4 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -68,8 +68,8 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); * @param pMsg The request message * @param pRsp The response message */ -int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); -int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg); +int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp); +int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f98f81f8c5..fbb9e04a25 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -55,8 +55,8 @@ int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeFetchThreads = 1; -int32_t tsNumOfSnodeSharedThreads = 2; -int32_t tsNumOfSnodeUniqueThreads = 2; +int32_t tsNumOfSnodeStreamThreads = 4; +int32_t tsNumOfSnodeWriteThreads = 1; // monitor bool tsEnableMonitor = true; @@ -133,7 +133,7 @@ int32_t tsDiskCfgNum = 0; SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0}; // stream scheduler -bool tsSchedStreamToSnode = true; +bool tsDeployOnSnode = true; /* * minimum scale for whole system, millisecond by default @@ -390,13 +390,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; - tsNumOfSnodeSharedThreads = tsNumOfCores / 4; - tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); - if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeSharedThreads, 2, 1024, 0) != 0) return -1; + tsNumOfSnodeStreamThreads = tsNumOfCores / 4; + tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4); + if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, 0) != 0) return -1; - tsNumOfSnodeUniqueThreads = tsNumOfCores / 4; - tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4); - if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 2, 1024, 0) != 0) return -1; + tsNumOfSnodeWriteThreads = tsNumOfCores / 4; + tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4); + if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, 0) != 0) return -1; tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); @@ -542,17 +542,17 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfSnodeSharedThreads = numOfCores / 4; - tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); - pItem->i32 = tsNumOfSnodeSharedThreads; + tsNumOfSnodeStreamThreads = numOfCores / 4; + tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4); + pItem->i32 = tsNumOfSnodeStreamThreads; pItem->stype = stype; } pItem = cfgGetItem(tsCfg, "numOfSnodeUniqueThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfSnodeUniqueThreads = numOfCores / 4; - tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4); - pItem->i32 = tsNumOfSnodeUniqueThreads; + tsNumOfSnodeWriteThreads = numOfCores / 4; + tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4); + pItem->i32 = tsNumOfSnodeWriteThreads; pItem->stype = stype; } @@ -696,8 +696,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; - tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; - tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; + tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; + tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; @@ -946,9 +946,9 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; */ } else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) { - tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; + tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; } else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) { - tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; + tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; } else if (strcasecmp("numOfLogLines", name) == 0) { tsNumOfLogLines = cfgGetItem(pCfg, "numOfLogLines")->i32; } diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index 4efe1c997b..9d519e88f0 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -30,9 +30,9 @@ typedef struct SSnodeMgmt { SMsgCb msgCb; const char *path; const char *name; - int8_t uniqueWorkerInUse; - SArray *uniqueWorkers; // SArray - SSingleWorker sharedWorker; + int8_t writeWorkerInUse; + SArray *writeWroker; // SArray + SSingleWorker streamWorker; } SSnodeMgmt; // smHandle.c @@ -43,13 +43,14 @@ int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); // smWorker.c int32_t smStartWorker(SSnodeMgmt *pMgmt); void smStopWorker(SSnodeMgmt *pMgmt); +int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pMsg); int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); +void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg); #ifdef __cplusplus } #endif -#endif /*_TD_DND_SNODE_INT_H_*/ \ No newline at end of file +#endif /*_TD_DND_SNODE_INT_H_*/ diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 65c96767ab..1ce7f1a84b 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -69,13 +69,13 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 971a6ac4c7..28097311ac 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -45,6 +45,7 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->name = pInput->name; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.mgmt = pMgmt; + pMgmt->msgCb.putToQueueFp = (PutToQueueFp)smPutMsgToQueue; SSnodeOpt option = {0}; smInitOption(pMgmt, &option); diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index ad56d57f69..6a70527541 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -26,18 +26,24 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) { tmsgSendRsp(&rsp); } -static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { +static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SSnodeMgmt *pMgmt = pInfo->ahandle; for (int32_t i = 0; i < numOfMsgs; i++) { SRpcMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); - - dTrace("msg:%p, get from snode-unique queue", pMsg); - if (sndProcessUMsg(pMgmt->pSnode, pMsg) < 0) { - ASSERT(0); + const STraceId *trace = &pMsg->info.traceId; + + dTrace("msg:%p, get from snode-write queue", pMsg); + int32_t code = sndProcessWriteMsg(pMgmt->pSnode, pMsg, NULL); + if (code < 0) { + dGError("snd, msg:%p failed to process write since %s", pMsg, terrstr(code)); + if (pMsg->info.handle != NULL) { + tmsgSendRsp(pMsg); + } + } else { + smSendRsp(pMsg, 0); } - smSendRsp(pMsg, 0); dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); @@ -45,13 +51,15 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num } } -static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SSnodeMgmt *pMgmt = pInfo->ahandle; +static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { + SSnodeMgmt *pMgmt = pInfo->ahandle; + const STraceId *trace = &pMsg->info.traceId; - dTrace("msg:%p, get from snode-shared queue", pMsg); - if (sndProcessSMsg(pMgmt->pSnode, pMsg) < 0) { + dTrace("msg:%p, get from snode-stream queue", pMsg); + int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg); + if (code < 0) { + dGError("snd, msg:%p failed to process stream since %s", pMsg, terrstr(code)); smSendRsp(pMsg, terrno); - ASSERT(0); } dTrace("msg:%p, is freed", pMsg); @@ -60,44 +68,44 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { } int32_t smStartWorker(SSnodeMgmt *pMgmt) { - pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SMultiWorker *)); - if (pMgmt->uniqueWorkers == NULL) { + pMgmt->writeWroker = taosArrayInit(0, sizeof(SMultiWorker *)); + if (pMgmt->writeWroker == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - for (int32_t i = 0; i < tsNumOfSnodeUniqueThreads; i++) { - SMultiWorker *pUniqueWorker = taosMemoryMalloc(sizeof(SMultiWorker)); - if (pUniqueWorker == NULL) { + for (int32_t i = 0; i < tsNumOfSnodeWriteThreads; i++) { + SMultiWorker *pWriteWorker = taosMemoryMalloc(sizeof(SMultiWorker)); + if (pWriteWorker == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } SMultiWorkerCfg cfg = { .max = 1, - .name = "snode-unique", - .fp = smProcessUniqueQueue, + .name = "snode-write", + .fp = smProcessWriteQueue, .param = pMgmt, }; - if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { + if (tMultiWorkerInit(pWriteWorker, &cfg) != 0) { dError("failed to start snode-unique worker since %s", terrstr()); return -1; } - if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) { + if (taosArrayPush(pMgmt->writeWroker, &pWriteWorker) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } } SSingleWorkerCfg cfg = { - .min = tsNumOfSnodeSharedThreads, - .max = tsNumOfSnodeSharedThreads, - .name = "snode-shared", - .fp = (FItem)smProcessSharedQueue, + .min = tsNumOfSnodeStreamThreads, + .max = tsNumOfSnodeStreamThreads, + .name = "snode-stream", + .fp = (FItem)smProcessStreamQueue, .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) { + if (tSingleWorkerInit(&pMgmt->streamWorker, &cfg)) { dError("failed to start snode shared-worker since %s", terrstr()); return -1; } @@ -107,30 +115,50 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } void smStopWorker(SSnodeMgmt *pMgmt) { - for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { - SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); + for (int32_t i = 0; i < taosArrayGetSize(pMgmt->writeWroker); i++) { + SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, i); tMultiWorkerCleanup(pWorker); } - taosArrayDestroy(pMgmt->uniqueWorkers); - tSingleWorkerCleanup(&pMgmt->sharedWorker); + taosArrayDestroy(pMgmt->writeWroker); + tSingleWorkerCleanup(&pMgmt->streamWorker); dDebug("snode workers are closed"); } -static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { - SMsgHead *pHead = pMsg->pCont; - pHead->vgId = htonl(pHead->vgId); - return pHead->vgId % tsNumOfSnodeUniqueThreads; -} +int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + if (pMsg == NULL) { + rpcFreeCont(pRpc->pCont); + pRpc->pCont = NULL; + return -1; + } + + SSnode *pSnode = pMgmt->pSnode; + if (pSnode == NULL) { + dError("snode: msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pMsg, terrstr(), + TMSG_INFO(pMsg->msgType), qtype); + return -1; + } -static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { - /*SMsgHead *pHead = pMsg->pCont;*/ - /*pHead->workerType = htonl(pHead->workerType);*/ - /*return pHead->workerType;*/ + SMsgHead *pHead = pRpc->pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = SNODE_HANDLE; + memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + + switch (qtype) { + case STREAM_QUEUE: + smPutNodeMsgToStreamQueue(pMgmt, pMsg); + break; + case WRITE_QUEUE: + smPutNodeMsgToWriteQueue(pMgmt, pMsg); + break; + default: + ASSERT(0); + } return 0; } int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); + SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -141,9 +169,8 @@ int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } -int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { - int32_t index = smGetSWIdFromMsg(pMsg); - SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); +int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -154,19 +181,14 @@ int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } -int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SSingleWorker *pWorker = &pMgmt->sharedWorker; +int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SSingleWorker *pWorker = &pMgmt->streamWorker; dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - taosWriteQitem(pWorker->queue, pMsg); - return 0; -} - -int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { - int32_t workerType = smGetSWTypeFromMsg(pMsg); - if (workerType == SND_WORKER_TYPE__SHARED) { - return smPutNodeMsgToSharedQueue(pMgmt, pMsg); + if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) { + sndEnqueueStreamDispatch(pMgmt->pSnode, pMsg); } else { - return smPutNodeMsgToUniqueQueue(pMgmt, pMsg); + taosWriteQitem(pWorker->queue, pMsg); } + return 0; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 0eaff6eafe..5ee4925a96 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -32,7 +32,7 @@ #include "tname.h" #include "tuuid.h" -extern bool tsSchedStreamToSnode; +extern bool tsDeployOnSnode; static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); @@ -190,7 +190,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, pTask->nodeId = SNODE_HANDLE; pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode); - plan->execNode.nodeId = 0; + plan->execNode.nodeId = SNODE_HANDLE; plan->execNode.epSet = pTask->epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { @@ -373,7 +373,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - if (tsSchedStreamToSnode) { + if (tsDeployOnSnode) { SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); if (pSnode == NULL) { SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 5ee5507981..3fcee862a1 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -31,7 +31,9 @@ extern "C" { #endif typedef struct SSnode { - SMsgCb msgCb; + char* path; + SStreamMeta* pMeta; + SMsgCb msgCb; } SSnode; #if 0 diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index cda4663285..719e2d8fad 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -15,197 +15,197 @@ #include "executor.h" #include "sndInt.h" +#include "tstream.h" #include "tuuid.h" -/*SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return NULL; }*/ -/*void sndClose(SSnode *pSnode) {}*/ -int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; } -int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; } + +void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { + char *msgStr = pMsg->pCont; + char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; + + SStreamDispatchReq req; + SDecoder decoder; + tDecoderInit(&decoder, msgBody, msgLen); + if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + goto FAIL; + } + tDecoderClear(&decoder); + + int32_t taskId = req.taskId; + + SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); + if (pTask) { + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessDispatchReq(pTask, &req, &rsp, false); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + return; + } + +FAIL: + if (pMsg->info.handle == NULL) return; + SRpcMsg rsp = { + .code = code, + .info = pMsg->info, + }; + tmsgSendRsp(&rsp); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + +int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { + ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); + ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); + + pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; + pTask->inputQueue = streamQueueOpen(); + pTask->outputQueue = streamQueueOpen(); + + if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { + return -1; + } + + pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + + pTask->pMsgCb = &pSnode->msgCb; + + pTask->startVer = ver; + + pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); + if (pTask->pState == NULL) { + return -1; + } + + SReadHandle mgHandle = { + .vnode = NULL, + .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), + .pStateBackend = pTask->pState, + }; + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); + ASSERT(pTask->exec.executor); + + return 0; +} SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode)); if (pSnode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pSnode->path = strdup(path); + if (pSnode->path == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } pSnode->msgCb = pOption->msgCb; -#if 0 - pSnode->pMeta = sndMetaNew(); + + pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE); if (pSnode->pMeta == NULL) { - taosMemoryFree(pSnode); - return NULL; + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } -#endif + return pSnode; + +FAIL: + taosMemoryFree(pSnode->path); + taosMemoryFree(pSnode); + return NULL; } void sndClose(SSnode *pSnode) { - /*sndMetaDelete(pSnode->pMeta);*/ + streamMetaClose(pSnode->pMeta); + taosMemoryFree(pSnode->path); taosMemoryFree(pSnode); } int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; } -#if 0 -SStreamMeta *sndMetaNew() { - SStreamMeta *pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); - if (pMeta == NULL) { - return NULL; - } - pMeta->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - if (pMeta->pHash == NULL) { - taosMemoryFree(pMeta); - return NULL; - } - return pMeta; -} - -void sndMetaDelete(SStreamMeta *pMeta) { - taosHashCleanup(pMeta->pHash); - taosMemoryFree(pMeta); -} - -int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); - return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); -} - -SStreamTask *sndMetaGetTask(SStreamMeta *pMeta, int32_t taskId) { - return taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); -} - -int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { - SStreamTask *pTask = taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); - if (pTask == NULL) { - return -1; - } - taosMemoryFree(pTask->exec.qmsg); - // TODO:free executor - taosMemoryFree(pTask); - return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t)); -} - -static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) { - SStreamMeta *pMeta = pNode->pMeta; - char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); +int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { + int32_t code; + // 1.deserialize msg and build task SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return -1; } SDecoder decoder; tDecoderInit(&decoder, (uint8_t *)msg, msgLen); - if (tDecodeSStreamTask(&decoder, pTask) < 0) { - ASSERT(0); + code = tDecodeSStreamTask(&decoder, pTask); + if (code < 0) { + tDecoderClear(&decoder); + taosMemoryFree(pTask); + return -1; } tDecoderClear(&decoder); - pTask->execStatus = TASK_EXEC_STATUS__IDLE; + ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); - pTask->inputQueue = streamQueueOpen(); - pTask->outputQueue = streamQueueOpen(); - pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; - pTask->outputStatus = TASK_INPUT_STATUS__NORMAL; - - if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL; - - pTask->pMsgCb = &pNode->msgCb; - - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); - ASSERT(pTask->exec.executor); - - streamSetupTrigger(pTask); - - qInfo("deploy stream: stream id %" PRId64 " task id %d child id %d on snode", pTask->streamId, pTask->taskId, - pTask->selfChildId); + // 2.save task + code = streamMetaAddTask(pSnode->pMeta, -1, pTask); + if (code < 0) { + return -1; + } - taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void *)); + // 3.go through recover steps to fill history + if (pTask->fillHistory) { + streamSetParamForRecover(pTask); + streamAggRecoverPrepare(pTask); + } return 0; +} -FAIL: - if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); - if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); - if (pTask) taosMemoryFree(pTask); - return -1; +int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { + SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; + return streamMetaRemoveTask(pSnode->pMeta, pReq->taskId); } -static int32_t sndProcessTaskRunReq(SSnode *pNode, SRpcMsg *pMsg) { - SStreamMeta *pMeta = pNode->pMeta; +int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTaskRunReq *pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); - streamProcessRunReq(pTask); - return 0; + SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); + if (pTask) { + streamProcessRunReq(pTask); + return 0; + } else { + return -1; + } } -static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) { - SStreamMeta *pMeta = pNode->pMeta; - - char *msgStr = pMsg->pCont; - char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - +int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { + char *msgStr = pMsg->pCont; + char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamDispatchReq req; SDecoder decoder; - tDecoderInit(&decoder, msgBody, msgLen); + tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); - int32_t taskId = req.taskId; - SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; - streamProcessDispatchReq(pTask, &req, &rsp, true); - return 0; -} - -static int32_t sndProcessTaskRecoverReq(SSnode *pNode, SRpcMsg *pMsg) { - SStreamMeta *pMeta = pNode->pMeta; - - SStreamTaskRecoverReq *pReq = pMsg->pCont; - int32_t taskId = pReq->taskId; - SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); - streamProcessRecoverReq(pTask, pReq, pMsg); - return 0; -} - -static int32_t sndProcessTaskDispatchRsp(SSnode *pNode, SRpcMsg *pMsg) { - SStreamMeta *pMeta = pNode->pMeta; - - SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t taskId = pRsp->taskId; - SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); - streamProcessDispatchRsp(pTask, pRsp); - return 0; -} - -static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) { - SStreamMeta *pMeta = pNode->pMeta; - - SStreamTaskRecoverRsp *pRsp = pMsg->pCont; - int32_t taskId = pRsp->rspTaskId; - SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); - streamProcessRecoverRsp(pTask, pRsp); - return 0; -} - -static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) { - SStreamMeta *pMeta = pNode->pMeta; - - char *msg = pMsg->pCont; - int32_t msgLen = pMsg->contLen; - SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; - int32_t code = taosHashRemove(pMeta->pHash, &pReq->taskId, sizeof(int32_t)); - ASSERT(code == 0); - if (code == 0) { - // sendrsp + int32_t taskId = req.taskId; + + SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); + if (pTask) { + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessDispatchReq(pTask, &req, &rsp, exec); + return 0; + } else { + return -1; } - return code; + return 0; } -static int32_t sndProcessTaskRetrieveReq(SSnode *pNode, SRpcMsg *pMsg) { - SStreamMeta *pMeta = pNode->pMeta; - +int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { char *msgStr = pMsg->pCont; char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -213,53 +213,64 @@ static int32_t sndProcessTaskRetrieveReq(SSnode *pNode, SRpcMsg *pMsg) { SDecoder decoder; tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamRetrieveReq(&decoder, &req); + tDecoderClear(&decoder); int32_t taskId = req.dstTaskId; - SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); + if (pTask) { + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessRetrieveReq(pTask, &req, &rsp); + tDeleteStreamRetrieveReq(&req); return 0; + } else { + return -1; + } +} + +int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { + SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t taskId = pRsp->taskId; + SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); + if (pTask) { + streamProcessDispatchRsp(pTask, pRsp); + return 0; + } else { + return -1; } - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; - streamProcessRetrieveReq(pTask, &req, &rsp); return 0; } -static int32_t sndProcessTaskRetrieveRsp(SSnode *pNode, SRpcMsg *pMsg) { +int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) { // return 0; } -int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { - // stream deploy - // stream stop/resume - // operator exec +int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { + void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); switch (pMsg->msgType) { case TDMT_STREAM_TASK_DEPLOY: - return sndProcessTaskDeployReq(pSnode, pMsg); + return sndProcessTaskDeployReq(pSnode, pReq, len); case TDMT_STREAM_TASK_DROP: - return sndProcessTaskDropReq(pSnode, pMsg); + return sndProcessTaskDropReq(pSnode, pReq, len); default: ASSERT(0); } return 0; } -int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { +int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_STREAM_TASK_RUN: return sndProcessTaskRunReq(pSnode, pMsg); case TDMT_STREAM_TASK_DISPATCH: - return sndProcessTaskDispatchReq(pSnode, pMsg); - case TDMT_STREAM_TASK_RECOVER: - return sndProcessTaskRecoverReq(pSnode, pMsg); + return sndProcessTaskDispatchReq(pSnode, pMsg, true); case TDMT_STREAM_RETRIEVE: - return sndProcessTaskRecoverReq(pSnode, pMsg); + return sndProcessTaskRetrieveReq(pSnode, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return sndProcessTaskDispatchRsp(pSnode, pMsg); - case TDMT_STREAM_TASK_RECOVER_RSP: - return sndProcessTaskRecoverRsp(pSnode, pMsg); case TDMT_STREAM_RETRIEVE_RSP: return sndProcessTaskRetrieveRsp(pSnode, pMsg); default: @@ -267,4 +278,3 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { } return 0; } -#endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8f8de4d858..2c01645389 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1272,7 +1272,6 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { } int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { - // SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); @@ -1285,7 +1284,6 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { - ASSERT(0); char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1349,7 +1347,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ce346c19ca..4579f0c7a4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -369,8 +369,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_SCH_FETCH_RSP: return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0); - //case TDMT_SCH_CANCEL_TASK: - // return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0); + // case TDMT_SCH_CANCEL_TASK: + // return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_SCH_DROP_TASK: return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_HEARTBEAT: @@ -385,9 +385,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); +#if 0 case TDMT_STREAM_TASK_DISPATCH: - // return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0); return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); +#endif /*case TDMT_STREAM_TASK_RECOVER:*/ /*return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);*/ case TDMT_STREAM_RETRIEVE: diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index ec8539dbfc..95bf689832 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -241,7 +241,7 @@ ./test.sh -f tsim/stream/triggerSession0.sim ./test.sh -f tsim/stream/partitionby.sim ./test.sh -f tsim/stream/partitionby1.sim -# unsupport ./test.sh -f tsim/stream/schedSnode.sim +./test.sh -f tsim/stream/schedSnode.sim ./test.sh -f tsim/stream/windowClose.sim ./test.sh -f tsim/stream/ignoreExpiredData.sim ./test.sh -f tsim/stream/sliding.sim diff --git a/tests/script/tsim/stream/schedSnode.sim b/tests/script/tsim/stream/schedSnode.sim index 61f01baf39..2caecf50a2 100644 --- a/tests/script/tsim/stream/schedSnode.sim +++ b/tests/script/tsim/stream/schedSnode.sim @@ -6,10 +6,14 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect -sql create snode on dnode 1 +sleep 50 + -sql create database test vgroups 1; +sql create database test vgroups 2; sql create database target vgroups 1; + +sql create snode on dnode 1 + sql use test; sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); sql create table ts1 using st tags(1,1,1); @@ -72,23 +76,23 @@ if $data01 != 8 then goto loop1 endi -if $data02 != 4 then +if $data02 != 6 then print =====data02=$data02 goto loop1 endi -if $data03 != 4 then - print ======$data03 +if $data03 != 52 then + print ======data03=$data03 goto loop1 endi if $data04 != 52 then - print ======$data04 + print ======data04=$data04 goto loop1 endi if $data05 != 13 then - print ======$data05 + print ======data05=$data05 goto loop1 endi @@ -104,17 +108,17 @@ if $data12 != 6 then endi if $data13 != 92 then - print ======$data13 + print ======data13=$data13 return -1 endi if $data14 != 22 then - print ======$data14 + print ======data14=$data14 return -1 endi if $data15 != 3 then - print ======$data15 + print ======data15=$data15 return -1 endi @@ -130,17 +134,17 @@ if $data22 != 4 then endi if $data23 != 32 then - print ======$data23 + print ======data23=$data23 return -1 endi if $data24 != 12 then - print ======$data24 + print ======data24=$data24 return -1 endi if $data25 != 3 then - print ======$data25 + print ======data25=$data25 return -1 endi @@ -156,17 +160,17 @@ if $data32 != 30 then endi if $data33 != 180 then - print ======$data33 + print ======data33=$data33 return -1 endi if $data34 != 42 then - print ======$data34 + print ======data34=$data34 return -1 endi if $data35 != 3 then - print ======$data35 + print ======data35=$data35 return -1 endi -- GitLab