未验证 提交 ab104d86 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13045 from taosdata/feature/stream

refactor(stream): remove out-of-date msg
...@@ -195,11 +195,8 @@ enum { ...@@ -195,11 +195,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
...@@ -236,9 +233,13 @@ enum { ...@@ -236,9 +233,13 @@ enum {
// Requests handled by SNODE // Requests handled by SNODE
TD_NEW_MSG_SEG(TDMT_SND_MSG) TD_NEW_MSG_SEG(TDMT_SND_MSG)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) //TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) //TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) //TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL)
// Requests handled by SCHEDULER // Requests handled by SCHEDULER
TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_NEW_MSG_SEG(TDMT_SCH_MSG)
......
...@@ -96,7 +96,7 @@ SArray *smGetMsgHandles() { ...@@ -96,7 +96,7 @@ SArray *smGetMsgHandles() {
// Requests handled by SNODE // Requests handled by SNODE
if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER; /*if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER;*/
code = 0; code = 0;
_OVER: _OVER:
......
...@@ -310,9 +310,6 @@ SArray *vmGetMsgHandles() { ...@@ -310,9 +310,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_PIPE_EXEC, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
......
...@@ -359,7 +359,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -359,7 +359,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// one merge only // one merge only
ASSERT(taosArrayGetSize(pArray) == 1); ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC; /*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
pTask->dispatchType = TASK_DISPATCH__FIXED; pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
...@@ -404,7 +405,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -404,7 +405,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
if (pStream->fixedSinkVgId == 0) { if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE; pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb); SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDb); ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
...@@ -434,7 +436,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -434,7 +436,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} }
} else { } else {
pTask->dispatchType = TASK_DISPATCH__FIXED; pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0); SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only // one sink only
ASSERT(taosArrayGetSize(pArray) == 1); ASSERT(taosArrayGetSize(pArray) == 1);
......
...@@ -103,8 +103,8 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -103,8 +103,8 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
sndMetaDeployTask(pSnode->pMeta, pTask); sndMetaDeployTask(pSnode->pMeta, pTask);
} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) { /*} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/
sndProcessTaskExecReq(pSnode, pMsg); /*sndProcessTaskExecReq(pSnode, pMsg);*/
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -112,9 +112,9 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -112,9 +112,9 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// operator exec // operator exec
if (pMsg->msgType == TDMT_SND_TASK_EXEC) { /*if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/
sndProcessTaskExecReq(pSnode, pMsg); /*sndProcessTaskExecReq(pSnode, pMsg);*/
} else { /*} else {*/
ASSERT(0); ASSERT(0);
} /*}*/
} }
...@@ -278,13 +278,13 @@ int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) { ...@@ -278,13 +278,13 @@ int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
} }
int32_t qType; int32_t qType;
if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) {
qType = FETCH_QUEUE; qType = FETCH_QUEUE;
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||*/
pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { /*pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {*/
qType = MERGE_QUEUE; /*qType = MERGE_QUEUE;*/
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {*/
qType = WRITE_QUEUE; /*qType = WRITE_QUEUE;*/
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册