diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 4bbb3a4a487a328184d1055d7e662c64bad3d7f5..4687062b180b719cbfa0aea18948d52face10de6 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -195,11 +195,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) - TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) + TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) @@ -236,9 +233,13 @@ enum { // Requests handled by SNODE TD_NEW_MSG_SEG(TDMT_SND_MSG) TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + //TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + //TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + //TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + + TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL) // Requests handled by SCHEDULER TD_NEW_MSG_SEG(TDMT_SCH_MSG) diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index bf1bb145b7548f1e50958e4cf718ebdc627bdfcf..a3aab439debfbd536312a2b5cbc104b4cf0fa2e2 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -96,7 +96,7 @@ SArray *smGetMsgHandles() { // Requests handled by SNODE if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER; + /*if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER;*/ code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0af503bbcff4db55d9dcf660fbe174612d0e8073..ac56a9ba3d7aca44839890d04e62944cf1a8c23e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -310,9 +310,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_PIPE_EXEC, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 204c6c16a3844850a99731c977e93eb87fd6032c..58b51e4c548106a393b65ab3142064cd0c249481 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -359,7 +359,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // one merge only ASSERT(taosArrayGetSize(pArray) == 1); SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC; + /*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/ + pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; pTask->dispatchType = TASK_DISPATCH__FIXED; pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; @@ -404,7 +405,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { if (pStream->fixedSinkVgId == 0) { pTask->dispatchType = TASK_DISPATCH__SHUFFLE; - pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/ + pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb); ASSERT(pDb); if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { @@ -434,7 +436,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } } else { pTask->dispatchType = TASK_DISPATCH__FIXED; - pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/ + pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; SArray* pArray = taosArrayGetP(pStream->tasks, 0); // one sink only ASSERT(taosArrayGetSize(pArray) == 1); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index ec75ffcae1b2b00bf2881b1fbf6e32c5d4f8a481..37b406466df5e229833d8271c81cdfeb28084a80 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -103,8 +103,8 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); sndMetaDeployTask(pSnode->pMeta, pTask); - } else if (pMsg->msgType == TDMT_SND_TASK_EXEC) { - sndProcessTaskExecReq(pSnode, pMsg); + /*} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/ + /*sndProcessTaskExecReq(pSnode, pMsg);*/ } else { ASSERT(0); } @@ -112,9 +112,9 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { // operator exec - if (pMsg->msgType == TDMT_SND_TASK_EXEC) { - sndProcessTaskExecReq(pSnode, pMsg); - } else { - ASSERT(0); - } + /*if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/ + /*sndProcessTaskExecReq(pSnode, pMsg);*/ + /*} else {*/ + ASSERT(0); + /*}*/ } diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 67f7a44a719d24a065dbe4c3a81cc46a1324dc8d..dc0fbf2bbef00c5bb81bc03182e31edb1f729894 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -278,13 +278,13 @@ int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) { } int32_t qType; - if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { + if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) { qType = FETCH_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || - pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { - qType = MERGE_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { - qType = WRITE_QUEUE; + /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||*/ + /*pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {*/ + /*qType = MERGE_QUEUE;*/ + /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {*/ + /*qType = WRITE_QUEUE;*/ } else { ASSERT(0); }