提交 53198f58 编写于 作者: S Shengliang Guan

refactor: add post process for query msg

上级 4a4d5cbd
...@@ -95,8 +95,8 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); ...@@ -95,8 +95,8 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
*/ */
int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessRpcMsg(SRpcMsg *pMsg);
int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
int32_t mndPreProcessMsg(SRpcMsg *pMsg); int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg);
void mndAbortPreprocessMsg(SRpcMsg *pMsg); void mndPostProcessQueryMsg(SRpcMsg *pMsg);
/** /**
* @brief Generate machine code * @brief Generate machine code
......
...@@ -68,6 +68,10 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -68,6 +68,10 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
mmSendRsp(pMsg, code); mmSendRsp(pMsg, code);
} }
if (code == TSDB_CODE_RPC_REDIRECT) {
mndPostProcessQueryMsg(pMsg);
}
dTrace("msg:%p, is freed, code:0x%x", pMsg, code); dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
...@@ -116,7 +120,7 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -116,7 +120,7 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pMsg->info.node = pMgmt->pMnode; pMsg->info.node = pMgmt->pMnode;
if (mndPreProcessMsg(pMsg) != 0) { if (mndPreProcessQueryMsg(pMsg) != 0) {
dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
return -1; return -1;
} }
......
...@@ -546,12 +546,10 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { ...@@ -546,12 +546,10 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (!IsReq(pMsg)) return 0;
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && if (pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
pMsg->msgType != TDMT_MND_TRANS_TIMER) { pMsg->msgType != TDMT_MND_TRANS_TIMER) {
mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
mndAbortPreprocessMsg(pMsg);
SEpSet epSet = {0}; SEpSet epSet = {0};
mndGetMnodeEpSet(pMsg->info.node, &epSet); mndGetMnodeEpSet(pMsg->info.node, &epSet);
......
...@@ -18,16 +18,14 @@ ...@@ -18,16 +18,14 @@
#include "mndMnode.h" #include "mndMnode.h"
#include "qworker.h" #include "qworker.h"
int32_t mndPreProcessMsg(SRpcMsg *pMsg) { int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) return 0; if (TDMT_VND_QUERY != pMsg->msgType) return 0;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
} }
void mndAbortPreprocessMsg(SRpcMsg *pMsg) { void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) return; if (TDMT_VND_QUERY != pMsg->msgType) return;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册