From 23707bfcb91f7435da1036876fc9101a631e8c06 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 17 May 2022 10:42:02 +0800 Subject: [PATCH] refactor: multi-process test mode --- source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 19 ++++--- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 6 +-- source/dnode/mgmt/node_mgmt/src/dmNodes.c | 22 ++++---- source/dnode/mgmt/node_mgmt/src/dmProc.c | 14 ++--- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 51 +++++++++++-------- source/libs/qworker/src/qworkerMsg.c | 1 + 6 files changed, 60 insertions(+), 53 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index f9baa85b63..4453caed4f 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -27,16 +27,15 @@ extern "C" { typedef struct SMgmtWrapper SMgmtWrapper; -#define SINGLE_PROC 0 -#define CHILD_PROC 1 -#define PARENT_PROC 2 -#define TEST_PROC 3 -#define OnlyInSingleProc(ptype) (ptype == SINGLE_PROC) -#define OnlyInChildProc(ptype) (ptype == CHILD_PROC) -#define OnlyInParentProc(ptype) (ptype == PARENT_PROC) -#define OnlyInTestProc(ptype) (ptype == TEST_PROC) -#define InChildProc(ptype) (ptype & CHILD_PROC) -#define InParentProc(ptype) (ptype & PARENT_PROC) +#define SINGLE_PROC 0 +#define CHILD_PROC 1 +#define PARENT_PROC 2 +#define TEST_PROC 3 +#define OnlyInSingleProc(wrapper) ((wrapper)->proc.ptype == SINGLE_PROC) +#define OnlyInChildProc(wrapper) ((wrapper)->proc.ptype == CHILD_PROC) +#define OnlyInParentProc(wrapper) ((wrapper)->proc.ptype == PARENT_PROC) +#define InChildProc(wrapper) ((wrapper)->proc.ptype & CHILD_PROC) +#define InParentProc(wrapper) ((wrapper)->proc.ptype & PARENT_PROC) typedef struct { int32_t head; diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 96a9b67c90..276c042422 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -164,7 +164,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { goto _OVER; } - if (OnlyInSingleProc(pDnode->ptype) || InParentProc(pDnode->ptype)) { + if (pDnode->ptype == SINGLE_PROC || (pDnode->ptype & PARENT_PROC)) { pDnode->lockfile = dmCheckRunning(tsDataDir); if (pDnode->lockfile == NULL) { goto _OVER; @@ -231,7 +231,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { int32_t code = 0; taosRLockLatch(&pWrapper->latch); - if (pWrapper->deployed || (InParentProc(pWrapper->proc.ptype) && pWrapper->required)) { + if (pWrapper->deployed || (InParentProc(pWrapper) && pWrapper->required)) { int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount); } else { @@ -276,7 +276,6 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) { rsp.contLen = pReq->contLen; } rpcSendResponse(&rsp); - rpcFreeCont(pReq->pCont); } void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pReq) { @@ -304,5 +303,4 @@ void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pReq) { _OVER: rpcSendResponse(&rspMsg); - rpcFreeCont(pReq->pCont); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index 2bc5819df2..ff9d4089cd 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -75,11 +75,11 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { SMgmtOutputOpt output = {0}; SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); - if (pWrapper->ntype == DNODE || InChildProc(pWrapper->proc.ptype)) { + if (pWrapper->ntype == DNODE || InChildProc(pWrapper)) { tmsgSetDefaultMsgCb(&input.msgCb); } - if (OnlyInSingleProc(pWrapper->proc.ptype)) { + if (OnlyInSingleProc(pWrapper)) { dInfo("node:%s, start to open", pWrapper->name); if ((*pWrapper->func.openFp)(&input, &output) != 0) { dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); @@ -89,7 +89,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { pWrapper->deployed = true; } - if (InParentProc(pWrapper->proc.ptype)) { + if (InParentProc(pWrapper)) { dDebug("node:%s, start to open", pWrapper->name); if (dmCreateShm(pWrapper) != 0) { return -1; @@ -98,7 +98,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { return -1; } - if (OnlyInParentProc(pWrapper->proc.ptype)) { + if (OnlyInParentProc(pWrapper)) { if (dmInitProc(pWrapper) != 0) { dError("node:%s, failed to init proc since %s", pWrapper->name, terrstr()); return -1; @@ -118,7 +118,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { dDebug("node:%s, has been opened in parent process", pWrapper->name); } - if (InChildProc(pWrapper->proc.ptype)) { + if (InChildProc(pWrapper)) { dDebug("node:%s, start to open", pWrapper->name); if ((*pWrapper->func.openFp)(&input, &output) != 0) { dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); @@ -143,7 +143,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { } int32_t dmStartNode(SMgmtWrapper *pWrapper) { - if (OnlyInParentProc(pWrapper->proc.ptype)) return 0; + if (OnlyInParentProc(pWrapper)) return 0; if (pWrapper->func.startFp != NULL) { dDebug("node:%s, start to start", pWrapper->name); if ((*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) { @@ -173,7 +173,7 @@ void dmCloseNode(SMgmtWrapper *pWrapper) { taosMsleep(10); } - if (OnlyInParentProc(pWrapper->proc.ptype)) { + if (OnlyInParentProc(pWrapper)) { int32_t pid = pWrapper->proc.pid; if (pid > 0 && taosProcExist(pid)) { dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pid); @@ -191,7 +191,7 @@ void dmCloseNode(SMgmtWrapper *pWrapper) { } taosWUnLockLatch(&pWrapper->latch); - if (!OnlyInSingleProc(pWrapper->proc.ptype)) { + if (!OnlyInSingleProc(pWrapper)) { dmCleanupProc(pWrapper); } @@ -242,7 +242,7 @@ static void dmCloseNodes(SDnode *pDnode) { } static void dmWatchNodes(SDnode *pDnode) { - if (!OnlyInParentProc(pDnode->ptype)) return; + if (pDnode->ptype != PARENT_PROC) return; if (pDnode->rtype == NODE_END) return; taosThreadMutexLock(&pDnode->mutex); @@ -251,7 +251,7 @@ static void dmWatchNodes(SDnode *pDnode) { SProc *proc = &pWrapper->proc; if (!pWrapper->required) continue; - if (!OnlyInParentProc(proc->ptype)) continue; + if (!OnlyInParentProc(pWrapper)) continue; if (proc->pid <= 0 || !taosProcExist(proc->pid)) { dWarn("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid); @@ -274,7 +274,7 @@ int32_t dmRunDnode(SDnode *pDnode) { } while (1) { - if (!pDnode->stop) { + if (pDnode->stop) { dInfo("dnode is about to stop"); dmSetStatus(pDnode, DND_STAT_STOPPED); dmStopNodes(pDnode); diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c index 810b9889ef..62fdf4d5ed 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ b/source/dnode/mgmt/node_mgmt/src/dmProc.c @@ -67,7 +67,7 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) { return NULL; } - if (InParentProc(proc->ptype)) { + if (proc->ptype & DND_PROC_PARENT) { if (dmInitProcMutex(queue) != 0) { return NULL; } @@ -315,7 +315,7 @@ static void *dmConsumChildQueue(void *param) { int32_t numOfMsgs = 0; int32_t code = 0; EProcFuncType ftype = DND_FUNC_REQ; - SRpcMsg *pReq = NULL; + SRpcMsg *pReq = NULL; dDebug("node:%s, start to consume from cqueue", proc->name); do { @@ -392,12 +392,14 @@ static void *dmConsumParentQueue(void *param) { rpcSendResponse(pRsp); } else if (ftype == DND_FUNC_REGIST) { pRsp = pHead; - dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle); + dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, + pRsp->info.handle); rpcRegisterBrokenLinkArg(pRsp); rpcFreeCont(pBody); } else if (ftype == DND_FUNC_RELEASE) { pRsp = pHead; - dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle); + dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, + pRsp->info.handle); dmRemoveProcRpcHandle(proc, pRsp->info.handle); rpcReleaseHandle(pRsp->info.handle, (int8_t)pRsp->code); rpcFreeCont(pBody); @@ -417,7 +419,7 @@ int32_t dmRunProc(SProc *proc) { taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - if (InParentProc(proc->ptype)) { + if (proc->ptype & DND_PROC_PARENT) { if (taosThreadCreate(&proc->pthread, &thAttr, dmConsumParentQueue, proc) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); dError("node:%s, failed to create pthread since %s", proc->name, terrstr()); @@ -426,7 +428,7 @@ int32_t dmRunProc(SProc *proc) { dDebug("node:%s, thread:%" PRId64 " is created to consume pqueue", proc->name, proc->pthread); } - if (InChildProc(proc->ptype)) { + if (proc->ptype & DND_PROC_CHILD) { if (taosThreadCreate(&proc->cthread, &thAttr, dmConsumChildQueue, proc) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); dError("node:%s, failed to create cthread since %s", proc->name, terrstr()); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 414e00f7b4..d4edda58df 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -52,7 +52,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t code = -1; SRpcMsg *pMsg = NULL; bool needRelease = false; - SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; + SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; SMgmtWrapper *pWrapper = NULL; dTrace("msg:%s is received, handle:%p cont:%p len:%d code:0x%04x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), @@ -66,32 +66,31 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pRpc->msgType == TDMT_DND_NET_TEST) { dmProcessNetTestReq(pDnode, pRpc); - return; + goto _OVER_JUST_FREE; } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { - code = qWorkerProcessFetchRsp(NULL, NULL, pRpc); - pRpc->pCont = NULL; // will be freed in qworker - return; + qWorkerProcessFetchRsp(NULL, NULL, pRpc); + goto _OVER_JUST_FREE; } else { } if (pDnode->status != DND_STAT_RUNNING) { if (pRpc->msgType == TDMT_DND_SERVER_STATUS) { dmProcessServerStartupStatus(pDnode, pRpc); + goto _OVER_JUST_FREE; } else { - SRpcMsg rspMsg = {.info = pRpc->info, .code = TSDB_CODE_APP_NOT_READY}; - rpcSendResponse(&rspMsg); + terrno = TSDB_CODE_APP_NOT_READY; + goto _OVER_RSP_FREE; } - return; } if (IsReq(pRpc) && pRpc->pCont == NULL) { terrno = TSDB_CODE_INVALID_MSG_LEN; - goto _OVER; + goto _OVER_RSP_FREE; } if (pHandle->defaultNtype == NODE_END) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; - goto _OVER; + goto _OVER_RSP_FREE; } else { pWrapper = &pDnode->wrappers[pHandle->defaultNtype]; if (pHandle->needCheckVgId) { @@ -103,18 +102,16 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } else if (vgId == MNODE_HANDLE) { pWrapper = &pDnode->wrappers[MNODE]; } else { - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; } } else { terrno = TSDB_CODE_INVALID_MSG_LEN; - goto _OVER; + goto _OVER_RSP_FREE; } } } if (dmMarkWrapper(pWrapper) != 0) { - goto _OVER; + goto _OVER_RSP_FREE; } else { needRelease = true; pRpc->info.wrapper = pWrapper; @@ -129,7 +126,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { goto _OVER; } - if (InParentProc(pWrapper->proc.ptype)) { + if (InParentProc(pWrapper)) { code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen, (IsReq(pRpc) && (pRpc->code == 0)) ? pRpc->info.handle : NULL, pRpc->info.refId, DND_FUNC_REQ); @@ -139,7 +136,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { _OVER: if (code == 0) { - if (pWrapper != NULL && InParentProc(pWrapper->proc.ptype)) { + if (pWrapper != NULL && InParentProc(pWrapper)) { dTrace("msg:%p, is freed after push to cqueue", pMsg); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); @@ -166,6 +163,16 @@ _OVER: if (needRelease) { dmReleaseWrapper(pWrapper); } + return; + +_OVER_JUST_FREE: + rpcFreeCont(pRpc->pCont); + return; + +_OVER_RSP_FREE: + rpcFreeCont(pRpc->pCont); + SRpcMsg simpleRsp = {.code = terrno, .info = pRpc->info}; + rpcSendResponse(&simpleRsp); } int32_t dmInitMsgHandle(SDnode *pDnode) { @@ -177,8 +184,8 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { if (pArray == NULL) return -1; for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SMgmtHandle *pMgmt = taosArrayGet(pArray, i); - SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; + SMgmtHandle *pMgmt = taosArrayGet(pArray, i); + SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; if (pMgmt->needCheckVgId) { pHandle->needCheckVgId = pMgmt->needCheckVgId; } @@ -249,7 +256,7 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) { static inline void dmSendRsp(const SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = pMsg->info.wrapper; - if (InChildProc(pWrapper->proc.ptype)) { + if (InChildProc(pWrapper)) { dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP); } else { if (pMsg->code == TSDB_CODE_NODE_REDIRECT) { @@ -262,7 +269,7 @@ static inline void dmSendRsp(const SRpcMsg *pMsg) { static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { SMgmtWrapper *pWrapper = pRsp->info.wrapper; - if (InChildProc(pWrapper->proc.ptype)) { + if (InChildProc(pWrapper)) { dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); } else { SRpcMsg rsp = {0}; @@ -280,7 +287,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = pMsg->info.wrapper; - if (InChildProc(pWrapper->proc.ptype)) { + if (InChildProc(pWrapper)) { dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST); } else { rpcRegisterBrokenLinkArg(pMsg); @@ -289,7 +296,7 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { SMgmtWrapper *pWrapper = pHandle->wrapper; - if (InChildProc(pWrapper->proc.ptype)) { + if (InChildProc(pWrapper)) { SRpcMsg msg = {.code = type, .info = *pHandle}; dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE); } else { diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 72eead724e..5608fc516c 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -495,6 +495,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qProcessFetchRsp(NULL, pMsg, NULL); + pMsg->pCont = NULL; return TSDB_CODE_SUCCESS; } -- GitLab