From 1061eef1441c56058b7fc4d31df51c7d9e0d157a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Apr 2023 10:49:18 +0800 Subject: [PATCH] fix:move consumer msg from fetch thread to query thread --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d61eb3ec03..fc724f2b45 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -519,7 +519,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 3c8687fa4d..579ef8a952 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -487,11 +487,16 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) { + if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } + if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) { + vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); + return 0; + } + SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_SCH_QUERY: @@ -499,6 +504,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); + case TDMT_VND_TMQ_CONSUME: + return tqProcessPollReq(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; @@ -508,17 +515,12 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || - pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && + pMsg->msgType == TDMT_VND_BATCH_META) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } - if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) { - vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); - return 0; - } - switch (pMsg->msgType) { case TDMT_SCH_FETCH: case TDMT_SCH_MERGE_FETCH: @@ -537,8 +539,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); - case TDMT_VND_TMQ_CONSUME: - return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: -- GitLab