From d5d8581bcffcb291b2332ecc3298ee29a66524a9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 26 May 2022 21:45:59 +0800 Subject: [PATCH] refactor(stream): remove out-of-date msg --- include/common/tmsgdef.h | 15 ++++++++------- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 3 --- source/dnode/mnode/impl/src/mndScheduler.c | 9 ++++++--- source/dnode/snode/src/snode.c | 14 +++++++------- source/libs/stream/src/tstream.c | 12 ++++++------ 6 files changed, 28 insertions(+), 27 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 4bbb3a4a48..4687062b18 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -195,11 +195,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) - TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) + TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) @@ -236,9 +233,13 @@ enum { // Requests handled by SNODE TD_NEW_MSG_SEG(TDMT_SND_MSG) TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + //TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + //TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + //TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + + TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL) // Requests handled by SCHEDULER TD_NEW_MSG_SEG(TDMT_SCH_MSG) diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index bf1bb145b7..a3aab439de 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -96,7 +96,7 @@ SArray *smGetMsgHandles() { // Requests handled by SNODE if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER; + /*if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER;*/ code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0af503bbcf..ac56a9ba3d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -310,9 +310,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_PIPE_EXEC, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 204c6c16a3..58b51e4c54 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -359,7 +359,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // one merge only ASSERT(taosArrayGetSize(pArray) == 1); SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC; + /*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/ + pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; pTask->dispatchType = TASK_DISPATCH__FIXED; pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; @@ -404,7 +405,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { if (pStream->fixedSinkVgId == 0) { pTask->dispatchType = TASK_DISPATCH__SHUFFLE; - pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/ + pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb); ASSERT(pDb); if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { @@ -434,7 +436,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } } else { pTask->dispatchType = TASK_DISPATCH__FIXED; - pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/ + pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; SArray* pArray = taosArrayGetP(pStream->tasks, 0); // one sink only ASSERT(taosArrayGetSize(pArray) == 1); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index ec75ffcae1..37b406466d 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -103,8 +103,8 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); sndMetaDeployTask(pSnode->pMeta, pTask); - } else if (pMsg->msgType == TDMT_SND_TASK_EXEC) { - sndProcessTaskExecReq(pSnode, pMsg); + /*} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/ + /*sndProcessTaskExecReq(pSnode, pMsg);*/ } else { ASSERT(0); } @@ -112,9 +112,9 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { // operator exec - if (pMsg->msgType == TDMT_SND_TASK_EXEC) { - sndProcessTaskExecReq(pSnode, pMsg); - } else { - ASSERT(0); - } + /*if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/ + /*sndProcessTaskExecReq(pSnode, pMsg);*/ + /*} else {*/ + ASSERT(0); + /*}*/ } diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 67f7a44a71..dc0fbf2bbe 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -278,13 +278,13 @@ int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) { } int32_t qType; - if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { + if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) { qType = FETCH_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || - pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { - qType = MERGE_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { - qType = WRITE_QUEUE; + /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||*/ + /*pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {*/ + /*qType = MERGE_QUEUE;*/ + /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {*/ + /*qType = WRITE_QUEUE;*/ } else { ASSERT(0); } -- GitLab