diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index 536ebdb26802dd81451b27b67cf6e7ce279c5bb8..b5ecc4930fd87c706b1adb8f4d20dbb7b0a5d419 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -124,13 +124,12 @@ void dnodeFreeMPeerQueue() { void dnodeDispatchToMPeerQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMPeerQueue == NULL) { dnodeSendRedirectMsg(pMsg, false); - rpcFreeCont(pMsg->pCont); - return; + } else { + SMnodeMsg *pPeer = mnodeCreateMsg(pMsg); + taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer); } - SMnodeMsg *pPeer = taosAllocateQitem(sizeof(SMnodeMsg)); - mnodeCreateMsg(pPeer, pMsg); - taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer); + rpcFreeCont(pMsg->pCont); } static void dnodeFreeMPeerMsg(SMnodeMsg *pPeer) { diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index 1a54dd1ecda42709197443f00b4b02fe8d19a719..c14c7a81586d306ff1423e8dba91e0844293c407 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -125,13 +125,12 @@ void dnodeFreeMReadQueue() { void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMReadQueue == NULL) { dnodeSendRedirectMsg(pMsg, true); - rpcFreeCont(pMsg->pCont); - return; + } else { + SMnodeMsg *pRead = mnodeCreateMsg(pMsg); + taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); } - SMnodeMsg *pRead = taosAllocateQitem(sizeof(SMnodeMsg)); - mnodeCreateMsg(pRead, pMsg); - taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); + rpcFreeCont(pMsg->pCont); } static void dnodeFreeMReadMsg(SMnodeMsg *pRead) { diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 5bf485d61063a783680e45a6979f803b185c535b..3940a251d4fbe481473b99ca8bd7be2d1a92980e 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -125,16 +125,14 @@ void dnodeFreeMWritequeue() { void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMWriteQueue == NULL) { dnodeSendRedirectMsg(pMsg, true); - rpcFreeCont(pMsg->pCont); - return; + } else { + SMnodeMsg *pWrite = mnodeCreateMsg(pMsg); + dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); + taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } - SMnodeMsg *pWrite = taosAllocateQitem(sizeof(SMnodeMsg)); - mnodeCreateMsg(pWrite, pMsg); - - dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, - taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); - taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); + rpcFreeCont(pMsg->pCont); } static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) { diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 4a5dc31a9fc5008b1a1c8288d9798d23acfb98b6..dcb48f7833abf921d1085e1cbb14b9cfbf2aa8b7 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -47,6 +47,11 @@ typedef struct { int32_t * vnodeList; } SOpenVnodeThread; +typedef struct { + SRpcMsg rpcMsg; + char pCont[]; +} SMgmtMsg; + void * tsDnodeTmr = NULL; static void * tsStatusTimer = NULL; static uint32_t tsRebootTime; @@ -172,38 +177,46 @@ void dnodeCleanupMgmt() { vnodeCleanupResources(); } -void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) { - void *item; +static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) { + int32_t size = sizeof(SMgmtMsg) + pMsg->contLen; + SMgmtMsg *pMgmt = taosAllocateQitem(size); + if (pMgmt == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } - item = taosAllocateQitem(sizeof(SRpcMsg)); - if (item) { - memcpy(item, pMsg, sizeof(SRpcMsg)); - taosWriteQitem(tsMgmtQueue, 1, item); - } else { - SRpcMsg rsp = { - .handle = pMsg->handle, - .pCont = NULL, - .code = TSDB_CODE_DND_OUT_OF_MEMORY - }; - + pMgmt->rpcMsg = *pMsg; + pMgmt->rpcMsg.pCont = pMgmt->pCont; + memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen); + taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt); + + return TSDB_CODE_SUCCESS; +} + +void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) { + int32_t code = dnodeWriteToMgmtQueue(pMsg); + if (code != TSDB_CODE_SUCCESS) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = code}; rpcSendResponse(&rsp); - rpcFreeCont(pMsg->pCont); } + + rpcFreeCont(pMsg->pCont); } static void *dnodeProcessMgmtQueue(void *param) { - SRpcMsg *pMsg; - SRpcMsg rsp = {0}; - int type; - void * handle; + SMgmtMsg *pMgmt; + SRpcMsg * pMsg; + SRpcMsg rsp = {0}; + int32_t qtype; + void * handle; while (1) { - if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) { + if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) { dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset); break; } - dDebug("%p, msg:%s will be processed", pMsg->ahandle, taosMsg[pMsg->msgType]); + pMsg = &pMgmt->rpcMsg; + dDebug("%p, msg:%p:%s will be processed", pMsg->ahandle, pMgmt, taosMsg[pMsg->msgType]); if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); } else { @@ -211,10 +224,9 @@ static void *dnodeProcessMgmtQueue(void *param) { } rsp.handle = pMsg->handle; - rsp.pCont = NULL; + rsp.pCont = NULL; rpcSendResponse(&rsp); - rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 2bf7718058640a8ce7248dde964bbf2a3aae49cb..128e4d35a4d2500ae49fa4e72d90813bc98b414b 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -50,9 +50,10 @@ typedef struct SMnodeMsg { int32_t code; void * pObj; SRpcMsg rpcMsg; + char pCont[]; } SMnodeMsg; -void mnodeCreateMsg(SMnodeMsg *pMsg, SRpcMsg *rpcMsg); +void * mnodeCreateMsg(SRpcMsg *pRpcMsg); int32_t mnodeInitMsg(SMnodeMsg *pMsg); void mnodeCleanupMsg(SMnodeMsg *pMsg); diff --git a/src/mnode/src/mnodeInt.c b/src/mnode/src/mnodeInt.c index fb1b8741a9f8895c579ad0e549dc8f3c15f7618c..98ba9d1e14c62fc0e59a5cd35c7acbc10a3ec4f1 100644 --- a/src/mnode/src/mnodeInt.c +++ b/src/mnode/src/mnodeInt.c @@ -18,7 +18,7 @@ #include "taosmsg.h" #include "taoserror.h" #include "trpc.h" -#include "tcache.h" +#include "tqueue.h" #include "mnode.h" #include "dnode.h" #include "mnodeDef.h" @@ -34,8 +34,15 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" -void mnodeCreateMsg(SMnodeMsg *pMsg, SRpcMsg *rpcMsg) { - pMsg->rpcMsg = *rpcMsg; +void *mnodeCreateMsg(SRpcMsg *pRpcMsg) { + int32_t size = sizeof(SMnodeMsg) + pRpcMsg->contLen; + SMnodeMsg *pMsg = taosAllocateQitem(size); + + pMsg->rpcMsg = *pRpcMsg; + pMsg->rpcMsg.pCont = pMsg->pCont; + memcpy(pMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen); + + return pMsg; } int32_t mnodeInitMsg(SMnodeMsg *pMsg) { @@ -54,7 +61,6 @@ int32_t mnodeInitMsg(SMnodeMsg *pMsg) { void mnodeCleanupMsg(SMnodeMsg *pMsg) { if (pMsg != NULL) { - if (pMsg->rpcMsg.pCont) rpcFreeCont(pMsg->rpcMsg.pCont); if (pMsg->pUser) mnodeDecUserRef(pMsg->pUser); if (pMsg->pDb) mnodeDecDbRef(pMsg->pDb); if (pMsg->pVgroup) mnodeDecVgroupRef(pMsg->pVgroup);