diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index a246d927767f0412714a3e0b73ecc266808bb90f..2a3d95363bf551d049f298a532f9a54a3eab7581 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -38,13 +38,6 @@ typedef struct SMgmtWrapper SMgmtWrapper; #define InChildProc(ptype) (ptype & CHILD_PROC) #define InParentProc(ptype) (ptype & PARENT_PROC) -typedef enum { - PROC_FUNC_REQ = 1, - PROC_FUNC_RSP = 2, - PROC_FUNC_REGIST = 3, - PROC_FUNC_RELEASE = 4, -} EProcFuncType; - typedef struct { int32_t head; int32_t tail; diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index c1e497a9de6a7e3e961b8ac9c12541457f1ad016..a43f6f1a94708841fd006436997ea55f9f1672c8 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -135,7 +135,6 @@ static void dmClearVars(SDnode *pDnode) { taosThreadMutexDestroy(&pDnode->mutex); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); taosMemoryFree(pDnode); - dDebug("dnode memory is cleared, data:%p", pDnode); } SDnode *dmCreate(const SDnodeOpt *pOption) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c index 493a35b81b11d16f7a180163c0d6fd714de3b372..73bab07c5822da2e87aa3ba68bdf4fe357bd678f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ b/source/dnode/mgmt/node_mgmt/src/dmProc.c @@ -131,7 +131,7 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe return -1; } - if (handle != 0 && ftype == PROC_FUNC_REQ) { + if (handle != 0 && ftype == DND_FUNC_REQ) { if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) { taosThreadMutexUnlock(&queue->mutex); return -1; @@ -185,8 +185,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe taosThreadMutexUnlock(&queue->mutex); tsem_post(&queue->sem); - dTrace("node:%s, push msg:%p:%d cont:%p%d handle:%p, ftype:%d pos:%d remain:%d", queue->name, pHead, headLen, pBody, - bodyLen, (void *)handle, ftype, pos, queue->items); + dTrace("node:%s, push msg:%p %d cont:%p %d handle:%p, ftype:%s pos:%d remain:%d", queue->name, pHead, headLen, pBody, + bodyLen, (void *)handle, dmFuncStr(ftype), pos, queue->items); return 0; } @@ -269,8 +269,8 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe *pBodyLen = rawBodyLen; *pFuncType = (EProcFuncType)ftype; - dTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", queue->name, pos, ftype, queue->items, - rawHeadLen, pHead, rawBodyLen, pBody); + dTrace("node:%s, pop msg:%p %d body:%p %d, ftype:%s pos:%d remain:%d", queue->name, pHead, rawHeadLen, pBody, + rawBodyLen, dmFuncStr(ftype), pos, queue->items); return 1; } @@ -312,7 +312,7 @@ static void *dmConsumChildQueue(void *param) { int32_t bodyLen = 0; int32_t numOfMsgs = 0; int32_t code = 0; - EProcFuncType ftype = PROC_FUNC_REQ; + EProcFuncType ftype = DND_FUNC_REQ; SNodeMsg *pReq = NULL; dDebug("node:%s, start to consume from cqueue", proc->name); @@ -329,7 +329,7 @@ static void *dmConsumChildQueue(void *param) { continue; } - if (ftype != PROC_FUNC_REQ) { + if (ftype != DND_FUNC_REQ) { dFatal("node:%s, msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype); taosFreeQitem(pHead); rpcFreeCont(pBody); @@ -347,7 +347,7 @@ static void *dmConsumChildQueue(void *param) { .pCont = pReq->pRsp, .contLen = pReq->rspLen, }; - dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, PROC_FUNC_RSP); + dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, DND_FUNC_RSP); taosFreeQitem(pHead); rpcFreeCont(pBody); rpcFreeCont(rspMsg.pCont); @@ -368,7 +368,7 @@ static void *dmConsumParentQueue(void *param) { int32_t bodyLen = 0; int32_t numOfMsgs = 0; int32_t code = 0; - EProcFuncType ftype = PROC_FUNC_REQ; + EProcFuncType ftype = DND_FUNC_REQ; SRpcMsg *pRsp = NULL; dDebug("node:%s, start to consume from pqueue", proc->name); @@ -385,18 +385,18 @@ static void *dmConsumParentQueue(void *param) { continue; } - if (ftype == PROC_FUNC_RSP) { + if (ftype == DND_FUNC_RSP) { pRsp = pHead; pRsp->pCont = pBody; dTrace("node:%s, rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle); dmRemoveProcRpcHandle(proc, pRsp->handle); rpcSendResponse(pRsp); - } else if (ftype == PROC_FUNC_REGIST) { + } else if (ftype == DND_FUNC_REGIST) { pRsp = pHead; dTrace("node:%s, regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle); rpcRegisterBrokenLinkArg(pRsp); rpcFreeCont(pBody); - } else if (ftype == PROC_FUNC_RELEASE) { + } else if (ftype == DND_FUNC_RELEASE) { pRsp = pHead; dTrace("node:%s, release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle); dmRemoveProcRpcHandle(proc, pRsp->handle); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index f9cebeb7f5ba8735f42a4723ef20dbbfdf9e675c..0a2fb71e98db5cd7e27c75c1041c4e564662b639 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -138,9 +138,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } if (InParentProc(pWrapper->proc.ptype)) { - dTrace("msg:%p, put into cqueue, handle:%p ref:%" PRId64, pMsg, pRpc->handle, pRpc->refId); + dTrace("msg:%p, put into cqueue, handle:%p refId:%" PRId64, pMsg, pRpc->handle, pRpc->refId); code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, - (isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, PROC_FUNC_REQ); + (isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, DND_FUNC_REQ); } else { code = dmProcessNodeMsg(pWrapper, pMsg); } @@ -277,7 +277,7 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { if (!InChildProc(pWrapper->proc.ptype)) { dmSendRpcRsp(pWrapper->pDnode, pRsp); } else { - dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP); + dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); } } @@ -295,7 +295,7 @@ static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp rsp.refId = pRsp->refId; rpcSendResponse(&rsp); } else { - dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP); + dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); } } @@ -303,7 +303,7 @@ static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg if (!InChildProc(pWrapper->proc.ptype)) { rpcRegisterBrokenLinkArg(pMsg); } else { - dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_FUNC_REGIST); + dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST); } } @@ -312,7 +312,7 @@ static inline void dmReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t rpcReleaseHandle(handle, type); } else { SRpcMsg msg = {.handle = handle, .code = type}; - dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_FUNC_RELEASE); + dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE); } } diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 3fbc6ad287bac61dd9f5af36677298c52fc5c206..9dbec6769223e64807ec98543c19f6c106f14722 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -80,6 +80,13 @@ typedef enum { DND_PROC_TEST, } EDndProcType; +typedef enum { + DND_FUNC_REQ = 1, + DND_FUNC_RSP = 2, + DND_FUNC_REGIST = 3, + DND_FUNC_RELEASE = 4, +} EProcFuncType; + typedef int32_t (*ProcessCreateNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); typedef bool (*IsNodeRequiredFp)(struct SDnode *pDnode, EDndNodeType ntype); @@ -157,6 +164,7 @@ const char *dmNodeProcName(EDndNodeType ntype); const char *dmNodeName(EDndNodeType ntype); const char *dmEventStr(EDndEvent etype); const char *dmProcStr(EDndProcType ptype); +const char *dmFuncStr(EProcFuncType etype); void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, void *nodeMsgFp, bool needCheckVgId); void dmGetMonitorSystemInfo(SMonSysInfo *pInfo); diff --git a/source/dnode/mgmt/node_util/src/dmUtil.c b/source/dnode/mgmt/node_util/src/dmUtil.c index e913af203b76063e176da433e5773d400764e7d8..986a3056aa56885120ae1e9aa76ba71117710964 100644 --- a/source/dnode/mgmt/node_util/src/dmUtil.c +++ b/source/dnode/mgmt/node_util/src/dmUtil.c @@ -108,6 +108,21 @@ const char *dmProcStr(EDndProcType etype) { } } +const char *dmFuncStr(EProcFuncType etype) { + switch (etype) { + case DND_FUNC_REQ: + return "req"; + case DND_FUNC_RSP: + return "rsp"; + case DND_FUNC_REGIST: + return "regist"; + case DND_FUNC_RELEASE: + return "release"; + default: + return "UNKNOWN"; + } +} + void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, void *nodeMsgFp, bool needCheckVgId) { SMgmtHandle handle = { .msgType = msgType, diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp index 3e8a6fc7e1fb7b96e27187c47aa2dd984d023e97..15a8b942b1d7e707dde7bb2fd63aeec082e8f4da 100644 --- a/source/util/test/procTest.cpp +++ b/source/util/test/procTest.cpp @@ -120,20 +120,20 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { SProc *cproc = dmInitProc(&cfg); ASSERT_NE(cproc, nullptr); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RSP), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REGIST), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RELEASE), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, NULL, 12, body, 0, 0, 0, PROC_FUNC_REQ), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REQ), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, shm.size, body, 0, 0, 0, PROC_FUNC_REQ), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, shm.size, 0, 0, PROC_FUNC_REQ), 0); + ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_RSP), 0); + ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_REGIST), 0); + ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_RELEASE), 0); + ASSERT_NE(dmPutToProcCQueue(cproc, NULL, 12, body, 0, 0, 0, DND_FUNC_REQ), 0); + ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_REQ), 0); + ASSERT_NE(dmPutToProcCQueue(cproc, &head, shm.size, body, 0, 0, 0, DND_FUNC_REQ), 0); + ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, shm.size, 0, 0, DND_FUNC_REQ), 0); for (int32_t j = 0; j < 1000; j++) { int32_t i = 0; for (i = 0; i < 20; ++i) { - ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0); + ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, DND_FUNC_REQ), 0); } - ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0); + ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, DND_FUNC_REQ), 0); cfg.isChild = true; cfg.name = "1235_p"; @@ -186,7 +186,7 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { for (int32_t j = 0; j < 1000; j++) { int32_t i = 0; for (i = 0; i < 20; ++i) { - dmPutToProcPQueue(pproc, &head, sizeof(STestMsg), body, i, PROC_FUNC_REQ); + dmPutToProcPQueue(pproc, &head, sizeof(STestMsg), body, i, DND_FUNC_REQ); } dmRunProc(cproc); @@ -236,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) { int32_t i = 0; for (i = 0; i < 20; ++i) { head.handle = (void *)((int64_t)i); - ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, PROC_FUNC_REQ), 0); + ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, DND_FUNC_REQ), 0); } cfg.isChild = true;