diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c index e01992426862492623d2d977120fc5b40e21ed4e..08c9edd854d6a2fa5b1e8ffe98078c86d6436777 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c @@ -47,10 +47,8 @@ static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) { static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SBnodeMgmt *pMgmt = pInfo->ahandle; - + int32_t code = -1; dTrace("msg:%p, get from bnode-monitor queue", pMsg); - SRpcMsg *pRpc = pMsg; - int32_t code = -1; if (pMsg->msgType == TDMT_MON_BM_INFO) { code = bmProcessGetMonBmInfoReq(pMgmt, pMsg); @@ -58,13 +56,13 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - if (pRpc->msgType & 1U) { + if (IsReq(pMsg)) { if (code != 0 && terrno != 0) code = terrno; bmSendRsp(pMsg, code); } dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pRpc->pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 6a7e0ad322efca98991c362d90d602c3ca692c26..bd78fe46effa60c8286cbd901b299f2506de6533 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -19,7 +19,6 @@ static void *dmStatusThreadFp(void *param) { SDnodeMgmt *pMgmt = param; int64_t lastTime = taosGetTimestampMs(); - setThreadName("dnode-status"); while (1) { @@ -40,7 +39,6 @@ static void *dmStatusThreadFp(void *param) { static void *dmMonitorThreadFp(void *param) { SDnodeMgmt *pMgmt = param; int64_t lastTime = taosGetTimestampMs(); - setThreadName("dnode-monitor"); while (1) { @@ -103,11 +101,9 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) { static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SDnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; - tmsg_t msgType = pMsg->msgType; - bool isRequest = msgType & 1u; - dTrace("msg:%p, will be processed in dnode-mgmt queue, type:%s", pMsg, TMSG_INFO(msgType)); + dTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - switch (msgType) { + switch (pMsg->msgType) { case TDMT_DND_CONFIG_DNODE: code = dmProcessConfigReq(pMgmt, pMsg); break; @@ -149,7 +145,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { break; } - if (isRequest) { + if (IsReq(pMsg)) { if (code != 0 && terrno != 0) code = terrno; SRpcMsg rsp = { .code = code, diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index fcfc4f4cee561e00bf4cc31a706898ee75fa2cc4..34a205232e69f057d879188fe4ed98df5b378e3a 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -28,10 +28,8 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) { static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SSnodeMgmt *pMgmt = pInfo->ahandle; - + int32_t code = -1; dTrace("msg:%p, get from snode-monitor queue", pMsg); - SRpcMsg *pRpc = pMsg; - int32_t code = -1; if (pMsg->msgType == TDMT_MON_SM_INFO) { code = smProcessGetMonitorInfoReq(pMgmt, pMsg); @@ -39,13 +37,13 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - if (pRpc->msgType & 1U) { + if (IsReq(pMsg)) { if (code != 0 && terrno != 0) code = terrno; smSendRsp(pMsg, code); } dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pRpc->pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 3a5e8a671ca3e88a10f64cdb1686c3b702eb8d4b..e7bfbdae58bf0556c8e873511476b3b17bd75a73 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -29,7 +29,7 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { tmsgSendRsp(&rsp); } -static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { +static void vmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); @@ -92,7 +92,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SArray * pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); if (pArray == NULL) { dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); return; @@ -222,8 +222,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { - SRpcMsg * pRpc = pMsg; - SMsgHead *pHead = pRpc->pCont; + SMsgHead *pHead = pMsg->pCont; int32_t code = 0; pHead->contLen = ntohl(pHead->contLen); @@ -237,23 +236,23 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType switch (qtype) { case QUERY_QUEUE: - dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pQueryQ, pMsg); break; case FETCH_QUEUE: - dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pFetchQ, pMsg); break; case WRITE_QUEUE: - dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pWriteQ, pMsg); break; case SYNC_QUEUE: - dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pSyncQ, pMsg); break; case MERGE_QUEUE: - dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pMergeQ, pMsg); break; default: @@ -301,7 +300,7 @@ int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) { - SMsgHead * pHead = pRpc->pCont; + SMsgHead *pHead = pRpc->pCont; SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; @@ -469,7 +468,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "vnode-mgmt", - .fp = (FItem)vmProcessMgmtMonitorQueue, + .fp = (FItem)vmProcessQueue, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { @@ -481,7 +480,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "vnode-monitor", - .fp = (FItem)vmProcessMgmtMonitorQueue, + .fp = (FItem)vmProcessQueue, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 2b3ac7ae73196d36578976d8049212c8f86dc764..27f1140f2379f2db9a5856ff72ad0fbc0f42d9f2 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -137,7 +137,6 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper); void dmSetStatus(SDnode *pDnode, EDndRunStatus stype); void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); -void dmProcessFetchRsp(SRpcMsg *pMsg); // dmNodes.c int32_t dmOpenNode(SMgmtWrapper *pWrapper); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index c83550c7b1e6b198e6c1647784bd70afa72d15b5..787f5e50190fb099d5998d4f604decd8b17feb68 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -314,8 +314,3 @@ void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) { rpcSendResponse(&rsp); rpcFreeCont(pMsg->pCont); } - -void dmProcessFetchRsp(SRpcMsg *pMsg) { - qWorkerProcessFetchRsp(NULL, NULL, pMsg); - // rpcFreeCont(pMsg->pCont); -} \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 8dfcb798ac966e25fc430b3bcddf7e69fd6534a2..6fbfae8b416efc68a0be9b101f1308aeba723752 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" +#include "qworker.h" static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet); static void dmSendRsp(SRpcMsg *pMsg); @@ -61,7 +62,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dmProcessNetTestReq(pDnode, pRpc); return; } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { - dmProcessFetchRsp(pRpc); + qWorkerProcessFetchRsp(NULL, NULL, pRpc); return; } else { } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 297b518ac76d215b40c5527a2822dd9bf48acba6..88cc97ddd525ed20d47c3c82879fad2d792953be 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -147,9 +147,6 @@ _err: int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); -#if 0 - SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; -#endif SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_VND_QUERY: