From 53198f58bbd33b4a85c84baf4aef6ebc8551e7cd Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 21 Jun 2022 09:48:31 +0800 Subject: [PATCH] refactor: add post process for query msg --- include/dnode/mnode/mnode.h | 4 ++-- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 6 +++++- source/dnode/mnode/impl/src/mndMain.c | 4 +--- source/dnode/mnode/impl/src/mndQuery.c | 6 ++---- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index f1cfa58f62..2509c10601 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -95,8 +95,8 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); -int32_t mndPreProcessMsg(SRpcMsg *pMsg); -void mndAbortPreprocessMsg(SRpcMsg *pMsg); +int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg); +void mndPostProcessQueryMsg(SRpcMsg *pMsg); /** * @brief Generate machine code diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 60c42d31f5..32477febeb 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -68,6 +68,10 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { mmSendRsp(pMsg, code); } + if (code == TSDB_CODE_RPC_REDIRECT) { + mndPostProcessQueryMsg(pMsg); + } + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); @@ -116,7 +120,7 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { pMsg->info.node = pMgmt->pMnode; - if (mndPreProcessMsg(pMsg) != 0) { + if (mndPreProcessQueryMsg(pMsg) != 0) { dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); return -1; } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 95b721b4dd..38bac80094 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -546,12 +546,10 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; - if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && + if (pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && pMsg->msgType != TDMT_MND_TRANS_TIMER) { mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); - mndAbortPreprocessMsg(pMsg); - SEpSet epSet = {0}; mndGetMnodeEpSet(pMsg->info.node, &epSet); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index f32a3129de..671152f9c6 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -18,16 +18,14 @@ #include "mndMnode.h" #include "qworker.h" -int32_t mndPreProcessMsg(SRpcMsg *pMsg) { +int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) { if (TDMT_VND_QUERY != pMsg->msgType) return 0; - SMnode *pMnode = pMsg->info.node; return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); } -void mndAbortPreprocessMsg(SRpcMsg *pMsg) { +void mndPostProcessQueryMsg(SRpcMsg *pMsg) { if (TDMT_VND_QUERY != pMsg->msgType) return; - SMnode *pMnode = pMsg->info.node; qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); } -- GitLab