diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 357b8f98d291ad23ebebb27326a6eeda286abbf9..5e606d067f71e5c3787c626756f4294bbaea173f 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -172,10 +172,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL) @@ -186,12 +182,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) @@ -205,11 +197,12 @@ enum { TD_NEW_MSG_SEG(TDMT_QND_MSG) - TD_NEW_MSG_SEG(TDMT_SND_MSG) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL) + //shared by snode and vnode + TD_NEW_MSG_SEG(TDMT_STREAM_MSG) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 27f5e6f7af0fbbdee169b6ab252798ffa3f4449a..224ea4e840baf54c271ffcf5b3014a5ca9138ebb 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -210,7 +210,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index a3aab439debfbd536312a2b5cbc104b4cf0fa2e2..66ab627e3245678fe1b04ab9c8d2b8876022361a 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -94,9 +94,10 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - // Requests handled by SNODE - if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; - /*if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER;*/ + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index ccc399605bd22132265b77a43dd44c4c5e506459..4c5a32536f13440e9abfaec0bf79eee6301aa761 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -210,8 +210,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d", createReq.vgId, createReq.isTsma, - createReq.standby); + dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d", createReq.vgId, createReq.isTsma, createReq.standby); vmGenerateVnodeCfg(&createReq, &vnodeCfg); if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) { @@ -333,11 +332,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONNECT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_DISCONNECT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - // if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutMsgToWriteQueue, 0)== NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -352,13 +346,14 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index de7d46ed83b863c22e0d6eca7904115445648c4e..0ad1c508b275c94bbf313abd641e659e5abb37c0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -95,6 +95,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t vgId = ntohl(pHead->vgId); if (vgId == QNODE_HANDLE) { pWrapper = &pDnode->wrappers[QNODE]; + } else if (vgId == SNODE_HANDLE) { + pWrapper = &pDnode->wrappers[SNODE]; } else if (vgId == MNODE_HANDLE) { pWrapper = &pDnode->wrappers[MNODE]; } else { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 8497a57edd2a1217aaf95933792ff0a9a6037db2..3d5dab1894f40b08f3e02eb27c9fc19d5add6c09 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -132,7 +132,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); + mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId); return 0; } @@ -156,7 +156,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0); + mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, 0); return 0; } @@ -222,7 +222,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p // dispatch pTask->dispatchType = TASK_DISPATCH__NONE; - mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); + mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId); } return 0; } @@ -267,8 +267,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr // dispatch pTask->dispatchType = TASK_DISPATCH__NONE; - /*mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);*/ - mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pStream->fixedSinkVg.vgId); + mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pStream->fixedSinkVg.vgId); return 0; } @@ -361,7 +360,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ASSERT(taosArrayGetSize(pArray) == 1); SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); /*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/ - pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; pTask->dispatchType = TASK_DISPATCH__FIXED; pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; @@ -407,7 +406,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->dispatchType = TASK_DISPATCH__SHUFFLE; /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/ - pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); ASSERT(pDb); if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { @@ -438,7 +437,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } else { pTask->dispatchType = TASK_DISPATCH__FIXED; /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/ - pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; SArray* pArray = taosArrayGetP(pStream->tasks, 0); // one sink only ASSERT(taosArrayGetSize(pArray) == 1); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a0aa2067d3fa76f2ef7dec332c3dda35c0d29349..7abe9e3c0ddeb27906b75f50f019eec754122546 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -54,8 +54,8 @@ int32_t mndInitStream(SMnode *pMnode) { }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); - mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); + /*mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 37b406466df5e229833d8271c81cdfeb28084a80..cbbe071c5fa386e178273303ae7d2b35bf1b7ad2 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -90,7 +90,7 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { // stream deploy // stream stop/resume // operator exec - if (pMsg->msgType == TDMT_SND_TASK_DEPLOY) { + if (pMsg->msgType == TDMT_STREAM_TASK_DEPLOY) { void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamTask *pTask = taosMemoryMalloc(sizeof(SStreamTask)); if (pTask == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4b46e48dd29febc5e563a67541983f56ae379ad1..7a9c9ef393a34da7317daf3f32cfef042288e446 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -158,7 +158,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp // TODO: handle error } break; - case TDMT_VND_TASK_DEPLOY: { + case TDMT_STREAM_TASK_DEPLOY: { if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { } @@ -238,21 +238,19 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg); + case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); - case TDMT_VND_TASK_RUN: { - int32_t code = tqProcessTaskRunReq(pVnode->pTq, pMsg); - pMsg->pCont = NULL; - return code; - } - case TDMT_VND_TASK_DISPATCH: + case TDMT_STREAM_TASK_RUN: + return tqProcessTaskRunReq(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_DISPATCH: return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); - case TDMT_VND_TASK_RECOVER: + case TDMT_STREAM_TASK_RECOVER: return tqProcessTaskRecoverReq(pVnode->pTq, pMsg); - case TDMT_VND_TASK_DISPATCH_RSP: + case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); - case TDMT_VND_TASK_RECOVER_RSP: + case TDMT_STREAM_TASK_RECOVER_RSP: return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); @@ -262,11 +260,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) { vTrace("message in write queue is processing"); - char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - SDeleteRes res = {0}; + char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SDeleteRes res = {0}; SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; - + switch (pMsg->msgType) { case TDMT_VND_DELETE: return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, pRsp, &res); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 91ede155bb5157d7d60fc84adecc79bb16124e13..3298ecfc3e33785b47d830e398884926a7cbe4ac 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -26,7 +26,7 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { pRunReq->streamId = pTask->streamId; pRunReq->taskId = pTask->taskId; SRpcMsg msg = { - .msgType = TDMT_VND_TASK_RUN, + .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq), }; diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 06c12ec343e05a907566a7f6cb5d2a64f602a172..a52afe93b737bcf496edd96a1c887a28f3e4e5eb 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -190,9 +190,9 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* dat } int32_t qType; - if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) { + if (pTask->dispatchMsgType == TDMT_STREAM_TASK_DISPATCH) { qType = FETCH_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH_WRITE) { + } else if (pTask->dispatchMsgType == TDMT_VND_STREAM_DISPATCH_WRITE) { qType = WRITE_QUEUE; } else { ASSERT(0);