diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index c6233a4f18b058aba181acce5d42cfca2c6315b0..43ee7cd80a5aa1880092a4ca3abe4add2d42249c 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -56,24 +56,6 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SMnodeMgmt *pMgmt = pInfo->ahandle; - int32_t code = -1; - dTrace("msg:%p, get from mnode-query queue", pMsg); - - pMsg->info.node = pMgmt->pMnode; - code = mndProcessMsg(pMsg); - - if (IsReq(pMsg) && pMsg->info.handle != NULL && code != 0) { - if (terrno != 0) code = terrno; - mmSendRsp(pMsg, code); - } - - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pWorker->queue, pMsg); @@ -131,7 +113,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = tsNumOfMnodeQueryThreads, .max = tsNumOfMnodeQueryThreads, .name = "mnode-query", - .fp = (FItem)mmProcessQueryQueue, + .fp = (FItem)mmProcessQueue, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 5c7089e1aa65f1de308f8bdfb770c866018f36ec..0c0d771a24021c0ec22cb49b501002b173ce3a70 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -19,36 +19,49 @@ #include "qworker.h" int32_t mndProcessQueryMsg(SRpcMsg *pReq) { + int32_t code = -1; SMnode *pMnode = pReq->info.node; SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb}; mTrace("msg:%p, in query queue is processing", pReq); switch (pReq->msgType) { case TDMT_VND_QUERY: - return qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pReq); + code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pReq); + break; case TDMT_VND_QUERY_CONTINUE: - return qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pReq); + code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pReq); + break; default: + terrno = TSDB_CODE_VND_APP_ERROR; mError("unknown msg type:%d in query queue", pReq->msgType); - return TSDB_CODE_VND_APP_ERROR; } + + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + return code; } int32_t mndProcessFetchMsg(SRpcMsg *pMsg) { + int32_t code = -1; SMnode *pMnode = pMsg->info.node; mTrace("msg:%p, in fetch queue is processing", pMsg); switch (pMsg->msgType) { case TDMT_VND_FETCH: - return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg); + code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg); + break; case TDMT_VND_DROP_TASK: - return qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg); + code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg); + break; case TDMT_VND_QUERY_HEARTBEAT: - return qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg); + code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg); + break; default: + terrno = TSDB_CODE_VND_APP_ERROR; mError("unknown msg type:%d in fetch queue", pMsg->msgType); - return TSDB_CODE_VND_APP_ERROR; } + + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + return code; } int32_t mndInitQuery(SMnode *pMnode) {