diff --git a/include/util/tprocess.h b/include/util/tprocess.h index b91c9283fea23d7209390a68d6d7de63a0c2329f..0e6f25e637319811f7cd4bb8e35894a96325d6b7 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -32,6 +32,8 @@ typedef void *(*ProcFp)(void *parent, SBlockItem *pItem); typedef struct SProcQueue SProcQueue; typedef struct { + void *pParent; + bool testFlag; int32_t childQueueSize; int32_t parentQueueSize; ProcFp childFp; @@ -40,11 +42,12 @@ typedef struct { typedef struct { int32_t pid; - SProcCfg cfg; SProcQueue *pChildQueue; SProcQueue *pParentQueue; pthread_t childThread; pthread_t parentThread; + ProcFp childFp; + ProcFp parentFp; void *pParent; bool stopFlag; bool testFlag; diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c index 788a6d5559613cb7dcbb1c8f44e19101ee7f7523..d335cc1c664344ad2f78f82343b6bdaeed122246 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c @@ -129,14 +129,14 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { cfg.parentFp = (ProcFp)mmConsumeParentQueue; cfg.childQueueSize = 1024 * 1024; cfg.parentQueueSize = 1024 * 1024; + cfg.testFlag = true; + cfg.pParent = pDnode; pMgmt->pProcess = taosProcInit(&cfg); if (pMgmt->pProcess == NULL) { return -1; } - pMgmt->pProcess->pParent = pDnode; - pMgmt->pProcess->testFlag = true; return taosProcStart(pMgmt->pProcess); } diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c index 706c451f6779f8ce35a79bf416cb66c6cf0ebe63..2f32099229514fa14272634229d06161a65ca275 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c @@ -24,7 +24,7 @@ static int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMsg); static int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMsg); static int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMsg); static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMsg); -static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg); +static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc); static void mmConsumeQueue(SDnode *pDnode, SMnodeMsg *pMsg); int32_t mmStartWorker(SDnode *pDnode) { @@ -139,7 +139,7 @@ void mmInitMsgFp(SMnodeMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessWriteMsg; } -static int32_t mndBuildMsg(SMnodeMsg *pMsg, SRpcMsg *pRpc) { +static int32_t mmBuildMsg(SMnodeMsg *pMsg, SRpcMsg *pRpc) { SRpcConnInfo connInfo = {0}; if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; @@ -178,7 +178,7 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { goto _OVER; } - if (mndBuildMsg(pMsg, pRpc) != 0) { + if (mmBuildMsg(pMsg, pRpc) != 0) { goto _OVER; } @@ -211,52 +211,52 @@ _OVER: rpcFreeCont(pRpc->pCont); } -int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) { - return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMnodeMsg); +int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMsg) { + return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); } -int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) { - return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMnodeMsg); +int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMsg) { + return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg); } -int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) { - return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMnodeMsg); +int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMsg) { + return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg); } -int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg) { - return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg); +int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpc) { + return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpc); } -int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg) { - return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpcMsg); +int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpc) { + return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpc); } -static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnodeMsg) { +static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMsg) { SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) return -1; - pMnodeMsg->pMnode = pMnode; - int32_t code = dndWriteMsgToWorker(pWorker, pMnodeMsg, 0); + pMsg->pMnode = pMnode; + int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0); mmRelease(pDnode, pMnode); return code; } -static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) { - int32_t contLen = sizeof(SMnodeMsg) + pRpcMsg->contLen; - SMnodeMsg *pMnodeMsg = taosAllocateQitem(contLen); - if (pMnodeMsg == NULL) { +static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) { + int32_t contLen = sizeof(SMnodeMsg) + pRpc->contLen; + SMnodeMsg *pMsg = taosAllocateQitem(contLen); + if (pMsg == NULL) { return -1; } - pMnodeMsg->contLen = pRpcMsg->contLen; - pMnodeMsg->pCont = (char *)pMnodeMsg + sizeof(SMnodeMsg); - memcpy(pMnodeMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen); - rpcFreeCont(pRpcMsg->pCont); + pMsg->contLen = pRpc->contLen; + pMsg->pCont = (char *)pMsg + sizeof(SMnodeMsg); + memcpy(pMsg->pCont, pRpc->pCont, pRpc->contLen); + rpcFreeCont(pRpc->pCont); - int32_t code = mmPutMsgToWorker(pDnode, pWorker, pMnodeMsg); + int32_t code = mmPutMsgToWorker(pDnode, pWorker, pMsg); if (code != 0) { - taosFreeQitem(pMnodeMsg); + taosFreeQitem(pMsg); } return code; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index e0ebed744c5adf6e66601000229e742b61d0a5fc..17bc432e8bfa78a3b3296f3da985eca4dfd0a4a8 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -33,6 +33,8 @@ typedef struct SProcQueue { } SProcQueue; static SProcQueue *taosProcQueueInit(int32_t size) { + if (size <= 0) size = SHM_DEFAULT_SIZE; + int32_t bufSize = CEIL4(size); int32_t headSize = CEIL4(sizeof(SProcQueue)); @@ -180,18 +182,12 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { return NULL; } - pProc->cfg = *pCfg; - - if (pProc->cfg.childQueueSize <= 0) { - pProc->cfg.childQueueSize = SHM_DEFAULT_SIZE; - } - - if (pProc->cfg.parentQueueSize <= 0) { - pProc->cfg.parentQueueSize = SHM_DEFAULT_SIZE; - } - - pProc->pChildQueue = taosProcQueueInit(pProc->cfg.childQueueSize); - pProc->pParentQueue = taosProcQueueInit(pProc->cfg.parentQueueSize); + pProc->pParent = pCfg->pParent; + pProc->childFp = pCfg->childFp; + pProc->parentFp = pCfg->parentFp; + pProc->testFlag = pCfg->testFlag; + pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize); + pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize); return pProc; } @@ -218,13 +214,13 @@ static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent) static void *taosProcThreadChildLoop(void *param) { SProcObj *pProc = param; - taosProcThreadLoop(pProc->pChildQueue, pProc->cfg.childFp, pProc->pParent); + taosProcThreadLoop(pProc->pChildQueue, pProc->childFp, pProc->pParent); return NULL; } static void *taosProcThreadParentLoop(void *param) { SProcObj *pProc = param; - taosProcThreadLoop(pProc->pParentQueue, pProc->cfg.parentFp, pProc->pParent); + taosProcThreadLoop(pProc->pParentQueue, pProc->parentFp, pProc->pParent); return NULL; }