diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 21d8405f582a7f3e1e47cb137a34b3fae71cc8f3..f1cfa58f62f8b80dc7f3725fcba6e0d3395e3d39 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -96,6 +96,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndPreProcessMsg(SRpcMsg *pMsg); +void mndAbortPreprocessMsg(SRpcMsg *pMsg); /** * @brief Generate machine code diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index f3f147955a03cb99f1ea806441eddd63ba8f96fe..3b8a37f4208386eb06ff4f3153fab271e3ddb011 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -64,6 +64,8 @@ typedef struct { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); +int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); + int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 5ea55d558ff39afda2f70ecb067abbd62adae073..95b721b4dd651ffd174e1b0a9bf4b15172af7203 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -550,6 +550,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { pMsg->msgType != TDMT_MND_TRANS_TIMER) { mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); + mndAbortPreprocessMsg(pMsg); + SEpSet epSet = {0}; mndGetMnodeEpSet(pMsg->info.node, &epSet); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 5374f48e47521f7cdc477ac469ce0bf5fbbf7aef..f32a3129dece8e55d37d68d61e39787e31fb8bdb 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -25,6 +25,13 @@ int32_t mndPreProcessMsg(SRpcMsg *pMsg) { return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); } +void mndAbortPreprocessMsg(SRpcMsg *pMsg) { + if (TDMT_VND_QUERY != pMsg->msgType) return; + + SMnode *pMnode = pMsg->info.node; + qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); +} + int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { int32_t code = -1; SMnode *pMnode = pMsg->info.node; diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index ecff861f50d5c31593edeaab28097308a5316b64..8c7c030dce584dca94d843d85876b3cf29538ac2 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -23,6 +23,7 @@ extern "C" { #include "qwInt.h" #include "dataSinkMgt.h" +int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF); int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 5635ec8fc61c2de62b7de4a34168fdcf03e40820..82a62b5c5a65c5a8e62e4103b674c81b4fe6b03a 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -283,6 +283,26 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } +int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == qWorkerMgmt || NULL == pMsg) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SSubQueryMsg *msg = pMsg->pCont; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + + uint64_t sId = msg->sId; + uint64_t qId = msg->queryId; + uint64_t tId = msg->taskId; + int64_t rId = msg->refId; + + QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); + qwAbortPrerocessQuery(QW_FPARAMS()); + QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle); + + return TSDB_CODE_SUCCESS; +} + int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 800cc4c6e59ff7da06f699ce6fb8294819873c14..0f95da42e208e83115fe88fad03b84a7b8691710 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -482,6 +482,13 @@ _return: QW_RET(code); } +int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { + QW_ERR_RET(qwDropTask(QW_FPARAMS())); + + QW_RET(TSDB_CODE_SUCCESS); +} + + int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; bool queryRsped = false;