diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 89553f978beaacd7f54cdce5ab786d510ecd9fc7..1ab101f705ac3f71fad134c200a22f903e4a8e86 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -72,7 +72,6 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); * @param pMsg The request message */ int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg); -int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e28618d940c53ad0f592ca14cd44e42f01ce0db9..1d7287ed0e954cf1f37b3e0dd395799747b080b3 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -129,9 +129,8 @@ int32_t* taosGetErrno(); // mnode-common #define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300) #define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301) -#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0302) -#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303) -#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0304) +#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0302) +#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0303) // mnode-show #define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310) diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c index e01992426862492623d2d977120fc5b40e21ed4e..08c9edd854d6a2fa5b1e8ffe98078c86d6436777 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c @@ -47,10 +47,8 @@ static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) { static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SBnodeMgmt *pMgmt = pInfo->ahandle; - + int32_t code = -1; dTrace("msg:%p, get from bnode-monitor queue", pMsg); - SRpcMsg *pRpc = pMsg; - int32_t code = -1; if (pMsg->msgType == TDMT_MON_BM_INFO) { code = bmProcessGetMonBmInfoReq(pMgmt, pMsg); @@ -58,13 +56,13 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - if (pRpc->msgType & 1U) { + if (IsReq(pMsg)) { if (code != 0 && terrno != 0) code = terrno; bmSendRsp(pMsg, code); } dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pRpc->pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 6a7e0ad322efca98991c362d90d602c3ca692c26..bd78fe46effa60c8286cbd901b299f2506de6533 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -19,7 +19,6 @@ static void *dmStatusThreadFp(void *param) { SDnodeMgmt *pMgmt = param; int64_t lastTime = taosGetTimestampMs(); - setThreadName("dnode-status"); while (1) { @@ -40,7 +39,6 @@ static void *dmStatusThreadFp(void *param) { static void *dmMonitorThreadFp(void *param) { SDnodeMgmt *pMgmt = param; int64_t lastTime = taosGetTimestampMs(); - setThreadName("dnode-monitor"); while (1) { @@ -103,11 +101,9 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) { static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SDnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; - tmsg_t msgType = pMsg->msgType; - bool isRequest = msgType & 1u; - dTrace("msg:%p, will be processed in dnode-mgmt queue, type:%s", pMsg, TMSG_INFO(msgType)); + dTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - switch (msgType) { + switch (pMsg->msgType) { case TDMT_DND_CONFIG_DNODE: code = dmProcessConfigReq(pMgmt, pMsg); break; @@ -149,7 +145,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { break; } - if (isRequest) { + if (IsReq(pMsg)) { if (code != 0 && terrno != 0) code = terrno; SRpcMsg rsp = { .code = code, diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index c4314a57b15ebc18df872261296911cc62ed07bc..43ee7cd80a5aa1880092a4ca3abe4add2d42249c 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -46,7 +46,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { code = mndProcessMsg(pMsg); } - if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; mmSendRsp(pMsg, code); } @@ -56,28 +56,6 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SMnodeMgmt *pMgmt = pInfo->ahandle; - int32_t code = -1; - tmsg_t msgType = pMsg->msgType; - bool isRequest = msgType & 1U; - dTrace("msg:%p, get from mnode-query queue", pMsg); - - pMsg->info.node = pMgmt->pMnode; - code = mndProcessMsg(pMsg); - - if (isRequest) { - if (pMsg->info.handle != NULL && code != 0) { - if (code != 0 && terrno != 0) code = terrno; - mmSendRsp(pMsg, code); - } - } - - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pWorker->queue, pMsg); @@ -135,7 +113,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = tsNumOfMnodeQueryThreads, .max = tsNumOfMnodeQueryThreads, .name = "mnode-query", - .fp = (FItem)mmProcessQueryQueue, + .fp = (FItem)mmProcessQueue, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 444c42717afdbfa8559fdbd083fc5720f7d4d682..35c94b7fbe786434cfb59191c8899949099d0325 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -19,70 +19,39 @@ static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { .code = code, - .info = pMsg->info, .pCont = pMsg->info.rsp, .contLen = pMsg->info.rspLen, + .info = pMsg->info, }; tmsgSendRsp(&rsp); } -static void qmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { +static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SQnodeMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; + dTrace("msg:%p, get from qnode queue", pMsg); - dTrace("msg:%p, get from qnode-monitor queue", pMsg); - SRpcMsg *pRpc = pMsg; - int32_t code = -1; - - if (pMsg->msgType == TDMT_MON_QM_INFO) { - code = qmProcessGetMonitorInfoReq(pMgmt, pMsg); - } else { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; + switch (pMsg->msgType) { + case TDMT_MON_QM_INFO: + code = qmProcessGetMonitorInfoReq(pMgmt, pMsg); + break; + default: + code = qndProcessQueryMsg(pMgmt->pQnode, pMsg); + break; } - if (pRpc->msgType & 1U) { + if (IsReq(pMsg) && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; qmSendRsp(pMsg, code); } - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pRpc->pCont); - taosFreeQitem(pMsg); -} - -static void qmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SQnodeMgmt *pMgmt = pInfo->ahandle; - - dTrace("msg:%p, get from qnode-query queue", pMsg); - SRpcMsg *pRpc = pMsg; - int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pRpc); - - if (pRpc->msgType & 1U && code != 0) { - qmSendRsp(pMsg, code); - } - - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - -static void qmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SQnodeMgmt *pMgmt = pInfo->ahandle; - - dTrace("msg:%p, get from qnode-fetch queue", pMsg); - SRpcMsg *pRpc = pMsg; - int32_t code = qndProcessFetchMsg(pMgmt->pQnode, pRpc); - - if (pRpc->msgType & 1U && code != 0) { - qmSendRsp(pMsg, code); - } - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { - dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); + dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pWorker->queue, pMsg); return 0; } @@ -101,9 +70,7 @@ int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); - if (pMsg == NULL) { - return -1; - } + if (pMsg == NULL) return -1; dTrace("msg:%p, create and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); memcpy(pMsg, pRpc, sizeof(SRpcMsg)); @@ -141,7 +108,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { .min = tsNumOfVnodeQueryThreads, .max = tsNumOfVnodeQueryThreads, .name = "qnode-query", - .fp = (FItem)qmProcessQueryQueue, + .fp = (FItem)qmProcessQueue, .param = pMgmt, }; @@ -154,7 +121,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { .min = tsNumOfQnodeFetchThreads, .max = tsNumOfQnodeFetchThreads, .name = "qnode-fetch", - .fp = (FItem)qmProcessFetchQueue, + .fp = (FItem)qmProcessQueue, .param = pMgmt, }; @@ -167,7 +134,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "qnode-monitor", - .fp = (FItem)qmProcessMonitorQueue, + .fp = (FItem)qmProcessQueue, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index fcfc4f4cee561e00bf4cc31a706898ee75fa2cc4..34a205232e69f057d879188fe4ed98df5b378e3a 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -28,10 +28,8 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) { static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SSnodeMgmt *pMgmt = pInfo->ahandle; - + int32_t code = -1; dTrace("msg:%p, get from snode-monitor queue", pMsg); - SRpcMsg *pRpc = pMsg; - int32_t code = -1; if (pMsg->msgType == TDMT_MON_SM_INFO) { code = smProcessGetMonitorInfoReq(pMgmt, pMsg); @@ -39,13 +37,13 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - if (pRpc->msgType & 1U) { + if (IsReq(pMsg)) { if (code != 0 && terrno != 0) code = terrno; smSendRsp(pMsg, code); } dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pRpc->pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 3a5e8a671ca3e88a10f64cdb1686c3b702eb8d4b..e7bfbdae58bf0556c8e873511476b3b17bd75a73 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -29,7 +29,7 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { tmsgSendRsp(&rsp); } -static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { +static void vmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); @@ -92,7 +92,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SArray * pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); if (pArray == NULL) { dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); return; @@ -222,8 +222,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { - SRpcMsg * pRpc = pMsg; - SMsgHead *pHead = pRpc->pCont; + SMsgHead *pHead = pMsg->pCont; int32_t code = 0; pHead->contLen = ntohl(pHead->contLen); @@ -237,23 +236,23 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType switch (qtype) { case QUERY_QUEUE: - dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pQueryQ, pMsg); break; case FETCH_QUEUE: - dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pFetchQ, pMsg); break; case WRITE_QUEUE: - dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pWriteQ, pMsg); break; case SYNC_QUEUE: - dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pSyncQ, pMsg); break; case MERGE_QUEUE: - dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pMergeQ, pMsg); break; default: @@ -301,7 +300,7 @@ int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) { - SMsgHead * pHead = pRpc->pCont; + SMsgHead *pHead = pRpc->pCont; SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; @@ -469,7 +468,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "vnode-mgmt", - .fp = (FItem)vmProcessMgmtMonitorQueue, + .fp = (FItem)vmProcessQueue, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { @@ -481,7 +480,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "vnode-monitor", - .fp = (FItem)vmProcessMgmtMonitorQueue, + .fp = (FItem)vmProcessQueue, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 2b3ac7ae73196d36578976d8049212c8f86dc764..27f1140f2379f2db9a5856ff72ad0fbc0f42d9f2 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -137,7 +137,6 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper); void dmSetStatus(SDnode *pDnode, EDndRunStatus stype); void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); -void dmProcessFetchRsp(SRpcMsg *pMsg); // dmNodes.c int32_t dmOpenNode(SMgmtWrapper *pWrapper); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index c83550c7b1e6b198e6c1647784bd70afa72d15b5..787f5e50190fb099d5998d4f604decd8b17feb68 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -314,8 +314,3 @@ void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) { rpcSendResponse(&rsp); rpcFreeCont(pMsg->pCont); } - -void dmProcessFetchRsp(SRpcMsg *pMsg) { - qWorkerProcessFetchRsp(NULL, NULL, pMsg); - // rpcFreeCont(pMsg->pCont); -} \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 8dfcb798ac966e25fc430b3bcddf7e69fd6534a2..6fbfae8b416efc68a0be9b101f1308aeba723752 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" +#include "qworker.h" static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet); static void dmSendRsp(SRpcMsg *pMsg); @@ -61,7 +62,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dmProcessNetTestReq(pDnode, pRpc); return; } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { - dmProcessFetchRsp(pRpc); + qWorkerProcessFetchRsp(NULL, NULL, pRpc); return; } else { } diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index 924b6db8f7c57d18b6ab1fc47b43a785449a9eca..3316a09462ff1d5ff7c940e623941c7abe72a76c 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -304,10 +304,10 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) { } code = mndCreateBnode(pMnode, pReq, pDnode, &createReq); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("bnode:%d, failed to create since %s", createReq.dnodeId, terrstr()); } @@ -414,10 +414,10 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) { } code = mndDropBnode(pMnode, pReq, pObj); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("bnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 57f7b341d460ba52e7e44678d0397929ab320914..8b2799833b8acbfd658fb4e6bc034af92d6e415e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -511,7 +511,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER; } - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + code = TSDB_CODE_ACTION_IN_PROGRESS; SUBSCRIBE_OVER: mndTransDrop(pTrans); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 0bf7b240b26e545be633e2c6d923847e50bff5a1..633dda454bb5e7165d6326772b78263aaf73a0a4 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -605,10 +605,10 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) { } code = mndCreateDb(pMnode, pReq, &createReq, pUser); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("db:%s, failed to create since %s", createReq.db, terrstr()); } @@ -839,10 +839,10 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) { dbObj.cfgVersion++; dbObj.updateTime = taosGetTimestampMs(); code = mndAlterDb(pMnode, pReq, pDb, &dbObj); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("db:%s, failed to alter since %s", alterReq.db, terrstr()); } @@ -1110,10 +1110,10 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) { } code = mndDropDb(pMnode, pReq, pDb); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("db:%s, failed to drop since %s", dropReq.db, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 2a4cf011156e777c4834a40a614d1e62a5b99465..01ff08cef9a8ebfd681038aea915fa7906d5ed12 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -504,10 +504,10 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) { } code = mndCreateDnode(pMnode, pReq, &createReq); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; CREATE_DNODE_OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, terrstr()); } @@ -585,10 +585,10 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { } code = mndDropDnode(pMnode, pReq, pDnode); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; DROP_DNODE_OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index cf2edb57842c7eb0985ba8e3a27fe93e0bf28b5d..9107dab693d4c9eb6adc6599d03126d5a59a5a69 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -330,10 +330,10 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) { } code = mndCreateFunc(pMnode, pReq, &createReq); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("func:%s, failed to create since %s", createReq.name, terrstr()); } @@ -386,10 +386,10 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) { } code = mndDropFunc(pMnode, pReq, pFunc); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("func:%s, failed to drop since %s", dropReq.name, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 04c0a934858823cb7bc2d4a9e18637e72f9dfacf..7f86eb8b3292508c9b903c68fd6306b766ac074f 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -402,10 +402,10 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) { } code = mndCreateMnode(pMnode, pReq, pDnode, &createReq); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr()); } @@ -574,10 +574,10 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) { } code = mndDropMnode(pMnode, pReq, pObj); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index b109ede81999e62b14bc7f95af486bdb830cdd33..6f42d66625aa52d4ce32b7fae3dd5947aea5fb1a 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -205,7 +205,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { } mndTransDrop(pTrans); - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + return TSDB_CODE_ACTION_IN_PROGRESS; } static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset) { diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index c153d865523623f2e5350d12c98ce6d71e2516ba..3dc6200229b8a519fcf193393535500e98f4df20 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -306,10 +306,10 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) { } code = mndCreateQnode(pMnode, pReq, pDnode, &createReq); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("qnode:%d, failed to create since %s", createReq.dnodeId, terrstr()); } @@ -416,10 +416,10 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) { } code = mndDropQnode(pMnode, pReq, pObj); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("qnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 5c7089e1aa65f1de308f8bdfb770c866018f36ec..78b70c9a74133b859b4175b195d4a939c37ebccc 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -18,37 +18,35 @@ #include "mndMnode.h" #include "qworker.h" -int32_t mndProcessQueryMsg(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; +int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { + int32_t code = -1; + SMnode *pMnode = pMsg->info.node; SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb}; - mTrace("msg:%p, in query queue is processing", pReq); - switch (pReq->msgType) { + mTrace("msg:%p, in query queue is processing", pMsg); + switch (pMsg->msgType) { case TDMT_VND_QUERY: - return qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pReq); + code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg); + break; case TDMT_VND_QUERY_CONTINUE: - return qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pReq); - default: - mError("unknown msg type:%d in query queue", pReq->msgType); - return TSDB_CODE_VND_APP_ERROR; - } -} - -int32_t mndProcessFetchMsg(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - mTrace("msg:%p, in fetch queue is processing", pMsg); - - switch (pMsg->msgType) { + code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg); + break; case TDMT_VND_FETCH: - return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg); + code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg); + break; case TDMT_VND_DROP_TASK: - return qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg); + code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg); + break; case TDMT_VND_QUERY_HEARTBEAT: - return qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg); + code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg); + break; default: - mError("unknown msg type:%d in fetch queue", pMsg->msgType); - return TSDB_CODE_VND_APP_ERROR; + terrno = TSDB_CODE_VND_APP_ERROR; + mError("unknown msg type:%d in query queue", pMsg->msgType); } + + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + return code; } int32_t mndInitQuery(SMnode *pMnode) { @@ -59,9 +57,9 @@ int32_t mndInitQuery(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_VND_QUERY_CONTINUE, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessFetchMsg); - mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessFetchMsg); - mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessFetchMsg); + mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessQueryMsg); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessQueryMsg); + mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessQueryMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 0f52f00d4e2059983fdf82cb3bde7fd16fae4b27..b38e901d49dee8e6c93dc629824e51bf44c71e0f 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -562,10 +562,10 @@ static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq) { } code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("sma:%s, failed to create since %s", createReq.name, terrstr(terrno)); } @@ -706,10 +706,10 @@ static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq) { } code = mndDropSma(pMnode, pReq, pDb, pSma); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("sma:%s, failed to drop since %s", dropReq.name, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 5f58f2c8904f6958202d5decc4ffb693ee97070c..87b61f59ecb088692941a9f57ebf89db2cefa054 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -312,10 +312,10 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) { } code = mndCreateSnode(pMnode, pReq, pDnode, &createReq); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("snode:%d, failed to create since %s", createReq.dnodeId, terrstr()); return -1; } @@ -424,10 +424,10 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) { } code = mndDropSnode(pMnode, pReq, pObj); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("snode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 7485510bc6f5db1873dced18e911adfe493a5d24..33e482829964cfae2c7c31e8d7d5a48cb7c1268a 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -827,10 +827,10 @@ static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq) { } code = mndCreateStb(pMnode, pReq, &createReq, pDb); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stb:%s, failed to create since %s", createReq.name, terrstr()); } @@ -1334,10 +1334,10 @@ static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq) { } code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stb:%s, failed to alter since %s", alterReq.name, terrstr()); } @@ -1475,10 +1475,10 @@ static int32_t mndProcessMDropStbReq(SRpcMsg *pReq) { } code = mndDropStb(pMnode, pReq, pDb, pStb); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 61a84fc95c335326a5f4172f66b4a1bd93ce1b41..7b6383a4704760a83e3520bdacbfb7b28efc7bd1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -472,10 +472,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; CREATE_STREAM_OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index c6eebb5c5d9600420806728066086699f6080b9b..b9b01a4391eeda3bcf277d4b85f315aa2accf65a 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -457,10 +457,10 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { } code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; CREATE_TOPIC_OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); } @@ -547,7 +547,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { return -1; } - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + return TSDB_CODE_ACTION_IN_PROGRESS; } static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 6210fe3fcfd66f1395a3f5c9e20fa1bee9558611..5d205197d1734951451a54dac9758979ab4a8b04 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1035,13 +1035,13 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA } } else { mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions); - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + return TSDB_CODE_ACTION_IN_PROGRESS; } } static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno); } return code; @@ -1049,7 +1049,7 @@ static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute undoActions since %s", terrstr()); } return code; @@ -1088,7 +1088,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { pTrans->stage = TRN_STAGE_COMMIT; mDebug("trans:%d, stage from redoAction to commit", pTrans->id); continueExec = true; - } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { + } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code)); continueExec = false; } else { @@ -1176,7 +1176,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { pTrans->stage = TRN_STAGE_UNDO_LOG; mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id); continueExec = true; - } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { + } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); continueExec = false; } else { diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index f2a42e3083d0777eecb3ffd9db2a0359c7939ff6..88e646e76548bc0db5ed2af9e6e26ceb489c5cd0 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -331,10 +331,10 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) { } code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("user:%s, failed to create since %s", createReq.user, terrstr()); } @@ -536,10 +536,10 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { } code = mndAlterUser(pMnode, pUser, &newUser, pReq); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("user:%s, failed to alter since %s", alterReq.user, terrstr()); } @@ -613,10 +613,10 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) { } code = mndDropUser(pMnode, pReq, pUser); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("user:%s, failed to drop since %s", dropReq.user, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 8c805dd8c705d41f62a5728c8bf978d0f30924d5..285ae030e1424680f6af999b6554629d8b7a7653 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -367,7 +367,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) { } int32_t code = (*fp)(pMsg); - if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code == TSDB_CODE_ACTION_IN_PROGRESS) { terrno = code; mTrace("msg:%p, in progress, app:%p", pMsg, ahandle); } else if (code != 0) { diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 1259363f9468c2667536a17652010914dd4e2eac..929643fcdf91ef7ba0d6a02b8a07de34f0209d54 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -43,44 +43,49 @@ void qndClose(SQnode *pQnode) { int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { - qTrace("message in qnode query queue is processing"); + int32_t code = -1; SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; + qTrace("message in qnode queue is processing"); switch (pMsg->msgType) { - case TDMT_VND_QUERY: { - return qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg); - } + case TDMT_VND_QUERY: + code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg); + break; case TDMT_VND_QUERY_CONTINUE: - return qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg); - default: - qError("unknown msg type:%d in query queue", pMsg->msgType); - return TSDB_CODE_VND_APP_ERROR; - } -} - -int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { - qTrace("message in fetch queue is processing"); - switch (pMsg->msgType) { + code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg); + break; case TDMT_VND_FETCH: - return qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg); + break; case TDMT_VND_FETCH_RSP: - return qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg); + break; case TDMT_VND_RES_READY: - return qWorkerProcessReadyMsg(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessReadyMsg(pQnode, pQnode->pQuery, pMsg); + break; case TDMT_VND_TASKS_STATUS: - return qWorkerProcessStatusMsg(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessStatusMsg(pQnode, pQnode->pQuery, pMsg); + break; case TDMT_VND_CANCEL_TASK: - return qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg); + break; case TDMT_VND_DROP_TASK: - return qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg); + break; case TDMT_VND_TABLE_META: - // return vnodeGetTableMeta(pQnode, pMsg); + // code = vnodeGetTableMeta(pQnode, pMsg); + // break; case TDMT_VND_CONSUME: - // return tqProcessConsumeReq(pQnode->pTq, pMsg); + // code = tqProcessConsumeReq(pQnode->pTq, pMsg); + // break; case TDMT_VND_QUERY_HEARTBEAT: - return qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg); + code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg); + break; default: - qError("unknown msg type:%d in fetch queue", pMsg->msgType); - return TSDB_CODE_VND_APP_ERROR; + qError("unknown msg type:%d in qnode queue", pMsg->msgType); + terrno = TSDB_CODE_VND_APP_ERROR; } + + if (code == 0) return TSDB_CODE_ACTION_IN_PROGRESS; + return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 297b518ac76d215b40c5527a2822dd9bf48acba6..88cc97ddd525ed20d47c3c82879fad2d792953be 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -147,9 +147,6 @@ _err: int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); -#if 0 - SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; -#endif SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_VND_QUERY: diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f84775f0d7e7b932b60255f05820bf0df99f81e9..3890a55ff16da11d132261c070b809d833f1b2aa 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -134,7 +134,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt cl // mnode-common TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Mnode not ready") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_RIGHTS, "Insufficient privilege for operation") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONNECTION, "Invalid message connection") diff --git a/tests/tsim/src/simSystem.c b/tests/tsim/src/simSystem.c index 969332ba5f032efcc2098c3bcc215456b8e6a509..1c751f290a2a23c1496cdbe01f90a3cb9985c860 100644 --- a/tests/tsim/src/simSystem.c +++ b/tests/tsim/src/simSystem.c @@ -98,6 +98,8 @@ SScript *simProcessCallOver(SScript *script) { return NULL; } + if (simScriptPos == -1) return NULL; + return simScriptList[simScriptPos]; } else { simDebug("script:%s, is stopped", script->fileName);