From 712587511c6fd49a4668506568cb2b687f6bee92 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 11 Mar 2022 16:46:29 +0800 Subject: [PATCH] shm --- include/dnode/mnode/mnode.h | 10 +--- .../dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c | 40 +++++++++---- source/dnode/mnode/impl/inc/mndInt.h | 1 - source/dnode/mnode/impl/src/mnode.c | 58 +++++-------------- 4 files changed, 46 insertions(+), 63 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 513da09f79..18ac74a940 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -115,21 +115,13 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr */ int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); -/** - * @brief Cleanup mnode msg. - * - * @param pMsg The request msg. - * @param code The error code. - */ -void mndSendRsp(SMndMsg *pMsg, int32_t code); - /** * @brief Process the read, write, sync request. * * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ -void mndProcessMsg(SMndMsg *pMsg); +int32_t mndProcessMsg(SMndMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c index 3369d90bd5..307235b728 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c @@ -25,21 +25,21 @@ static int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg); static int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg); static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg); static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc); -static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg); +static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg); int32_t mmStartWorker(SDnode *pDnode) { SMndMgmt *pMgmt = &pDnode->mmgmt; - if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeMsgQueue) != 0) { dError("failed to start mnode read worker since %s", terrstr()); return -1; } - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmConsumeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmConsumeMsgQueue) != 0) { dError("failed to start mnode write worker since %s", terrstr()); return -1; } - if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmConsumeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmConsumeMsgQueue) != 0) { dError("failed to start mnode sync worker since %s", terrstr()); return -1; } @@ -271,18 +271,36 @@ void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pC void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {} -static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) { - SMndMgmt *pMgmt = &pDnode->mmgmt; +static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { + SMnode *pMnode = mmAcquire(pDnode); + SRpcMsg *pRpc = &pMsg->rpcMsg; + tmsg_t msgType = pMsg->rpcMsg.msgType; + void *ahandle = pMsg->rpcMsg.ahandle; + bool isReq = (pRpc->msgType & 1U); + int32_t code = -1; - SMnode *pMnode = mmAcquire(pDnode); if (pMnode != NULL) { pMsg->pMnode = pMnode; - mndProcessMsg(pMsg); + code = mndProcessMsg(pMsg); mmRelease(pDnode, pMnode); - } else { - mndSendRsp(pMsg, terrno); + } + + if (isReq) { + if (pMsg->rpcMsg.handle == NULL) return; + if (code == 0) { + SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont}; + rpcSendResponse(&rsp); + } else { + if (terrno == TSDB_CODE_APP_NOT_READY) { + dndSendRedirectRsp(pDnode, pRpc); + } else if (terrno == TSDB_CODE_MND_ACTION_IN_PROGRESS) { + } else { + SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = terrno}; + rpcSendResponse(&rsp); + } + } } taosFreeQitem(pMsg); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pRpc->pCont); } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index b74ddfdf0e..3577f1dda3 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -128,7 +128,6 @@ typedef struct SMnode { int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg); -void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg); void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); uint64_t mndGenerateUid(char *name, int32_t len); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index b1898c061b..8ec1b959fb 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -60,12 +60,6 @@ int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) { return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg); } -void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) { - if (pMnode != NULL && pMnode->sendRedirectRspFp != NULL) { - (*pMnode->sendRedirectRspFp)(pMnode->pDnode, pMsg); - } -} - static void *mndBuildTimerMsg(int32_t *pContLen) { SMTimerReq timerReq = {0}; @@ -390,65 +384,45 @@ void mndDestroy(const char *path) { mDebug("mnode is destroyed"); } -void mndSendRsp(SMndMsg *pMsg, int32_t code) { - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; - rpcSendResponse(&rpcRsp); -} - -void mndProcessMsg(SMndMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - int32_t code = 0; - tmsg_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; - bool isReq = (msgType & 1U); +int32_t mndProcessMsg(SMndMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SRpcMsg *pRpc = &pMsg->rpcMsg; + tmsg_t msgType = pMsg->rpcMsg.msgType; + void *ahandle = pMsg->rpcMsg.ahandle; + bool isReq = (pRpc->msgType & 1U); mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle); if (isReq && !mndIsMaster(pMnode)) { - code = TSDB_CODE_APP_NOT_READY; + terrno = TSDB_CODE_APP_NOT_READY; mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); - goto PROCESS_RPC_END; + return -1; } - if (isReq && pMsg->rpcMsg.pCont == NULL) { - code = TSDB_CODE_MND_INVALID_MSG_LEN; + if (isReq && (pRpc->contLen == 0 || pRpc->pCont == NULL)) { + terrno = TSDB_CODE_MND_INVALID_MSG_LEN; mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); - goto PROCESS_RPC_END; + return -1; } MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)]; if (fp == NULL) { - code = TSDB_CODE_MSG_NOT_PROCESSED; + terrno = TSDB_CODE_MSG_NOT_PROCESSED; mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle); - goto PROCESS_RPC_END; + return -1; } - code = (*fp)(pMsg); + int32_t code = (*fp)(pMsg); if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { + terrno = code; mTrace("msg:%p, in progress, app:%p", pMsg, ahandle); - return; } else if (code != 0) { - code = terrno; mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); - goto PROCESS_RPC_END; } else { mTrace("msg:%p, is processed, app:%p", pMsg, ahandle); } -PROCESS_RPC_END: - if (isReq) { - if (pMsg->rpcMsg.handle == NULL) return; - - if (code == TSDB_CODE_APP_NOT_READY) { - mndSendRedirectRsp(pMnode, &pMsg->rpcMsg); - } else if (code != 0) { - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = code}; - rpcSendResponse(&rpcRsp); - } else { - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont}; - rpcSendResponse(&rpcRsp); - } - } + return code; } void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { -- GitLab