From 2c694a508029e4d8187f74c538fa012c3064c158 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 May 2022 18:10:54 +0800 Subject: [PATCH] refactor: remove rpc client in executor and scanoperator --- include/common/tmsgcb.h | 1 + source/dnode/mgmt/implement/inc/dmImp.h | 6 +- source/dnode/mgmt/implement/src/dmExec.c | 41 +++--- source/dnode/mgmt/implement/src/dmHandle.c | 5 +- source/dnode/mgmt/implement/src/dmObj.c | 3 - source/dnode/mgmt/implement/src/dmTransport.c | 124 ++++++++---------- source/dnode/mgmt/interface/src/dmInt.c | 10 +- source/libs/executor/inc/executorimpl.h | 3 - source/libs/executor/src/executorimpl.c | 6 +- source/libs/executor/src/scanoperator.c | 3 +- 10 files changed, 100 insertions(+), 102 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 4002db06ea..b6c96bb2d1 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -57,6 +57,7 @@ typedef struct { RegisterBrokenLinkArgFp registerBrokenLinkArgFp; ReleaseHandleFp releaseHandleFp; ReportStartup reportStartupFp; + void* clientRpc; } SMsgCb; void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); diff --git a/source/dnode/mgmt/implement/inc/dmImp.h b/source/dnode/mgmt/implement/inc/dmImp.h index 32869aee9e..8a1a116ab3 100644 --- a/source/dnode/mgmt/implement/inc/dmImp.h +++ b/source/dnode/mgmt/implement/inc/dmImp.h @@ -26,8 +26,10 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper); void dmCloseNode(SMgmtWrapper *pWrapper); // dmTransport.c -int32_t dmInitTrans(SDnode *pDnode); -void dmCleanupTrans(SDnode *pDnode); +int32_t dmInitServer(SDnode *pDnode); +void dmCleanupServer(SDnode *pDnode); +int32_t dmInitClient(SDnode *pDnode); +void dmCleanupClient(SDnode *pDnode); SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper); SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper); int32_t dmInitMsgHandle(SDnode *pDnode); diff --git a/source/dnode/mgmt/implement/src/dmExec.c b/source/dnode/mgmt/implement/src/dmExec.c index 06001028b5..6999cee037 100644 --- a/source/dnode/mgmt/implement/src/dmExec.c +++ b/source/dnode/mgmt/implement/src/dmExec.c @@ -213,10 +213,12 @@ static int32_t dmOpenNodes(SDnode *pDnode) { } pWrapper->procType = DND_PROC_CHILD; + if (dmInitClient(pDnode) != 0) { + return -1; + } - SMsgCb msgCb = pDnode->data.msgCb; - msgCb.pWrapper = pWrapper; - tmsgSetDefaultMsgCb(&msgCb); + pDnode->data.msgCb = dmGetMsgcb(pWrapper); + tmsgSetDefaultMsgCb(&pDnode->data.msgCb); if (dmOpenNode(pWrapper) != 0) { dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); @@ -234,6 +236,15 @@ static int32_t dmOpenNodes(SDnode *pDnode) { pWrapper->procType = DND_PROC_SINGLE; } + if (n == DNODE) { + if (dmInitClient(pDnode) != 0) { + return -1; + } + + pDnode->data.msgCb = dmGetMsgcb(pWrapper); + tmsgSetDefaultMsgCb(&pDnode->data.msgCb); + } + if (dmOpenNode(pWrapper) != 0) { dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); return -1; @@ -281,21 +292,21 @@ static void dmProcessProcHandle(void *handle) { } static void dmWatchNodes(SDnode *pDnode) { + if (pDnode->ptype != DND_PROC_PARENT) return; + if (pDnode->ntype == NODE_END) return; + taosThreadMutexLock(&pDnode->mutex); - if (pDnode->ptype == DND_PROC_PARENT) { - for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - if (!pWrapper->required) continue; - if (pWrapper->procType != DND_PROC_PARENT) continue; - if (pDnode->ntype == NODE_END) continue; + for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + if (!pWrapper->required) continue; + if (pWrapper->procType != DND_PROC_PARENT) continue; - if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) { - dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); - if (pWrapper->procObj) { - taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle); - } - dmNewNodeProc(pWrapper, n); + if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) { + dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); + if (pWrapper->procObj) { + taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle); } + dmNewNodeProc(pWrapper, n); } } taosThreadMutexUnlock(&pDnode->mutex); diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 32205b337c..a162cbbf2f 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -381,7 +381,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { return -1; } - if (dmInitTrans(pDnode) != 0) { + if (dmInitServer(pDnode) != 0) { dError("failed to init transport since %s", terrstr()); return -1; } @@ -412,7 +412,8 @@ static void dmCleanupMgmt(SMgmtWrapper *pWrapper) { } taosWUnLockLatch(&pDnode->data.latch); - dmCleanupTrans(pDnode); + dmCleanupClient(pDnode); + dmCleanupServer(pDnode); dInfo("dnode-mgmt is cleaned up"); } diff --git a/source/dnode/mgmt/implement/src/dmObj.c b/source/dnode/mgmt/implement/src/dmObj.c index 66bfb27016..a43439d465 100644 --- a/source/dnode/mgmt/implement/src/dmObj.c +++ b/source/dnode/mgmt/implement/src/dmObj.c @@ -124,9 +124,6 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { goto _OVER; } - pDnode->data.msgCb = dmGetMsgcb(&pDnode->wrappers[DNODE]); - tmsgSetDefaultMsgCb(&pDnode->data.msgCb); - dInfo("dnode is created, data:%p", pDnode); code = 0; diff --git a/source/dnode/mgmt/implement/src/dmTransport.c b/source/dnode/mgmt/implement/src/dmTransport.c index 03dc0045d9..45a71ee766 100644 --- a/source/dnode/mgmt/implement/src/dmTransport.c +++ b/source/dnode/mgmt/implement/src/dmTransport.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "dmImp.h" +#include "qworker.h" + #define INTERNAL_USER "_dnd" #define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" @@ -130,22 +132,27 @@ _OVER: } static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SDnodeTrans * pTrans = &pDnode->trans; + SDnodeTrans *pTrans = &pDnode->trans; tmsg_t msgType = pMsg->msgType; bool isReq = msgType & 1u; - SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; + SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; SMgmtWrapper *pWrapper = pHandle->pNdWrapper; - if (msgType == TDMT_DND_SERVER_STATUS) { - dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); - dmProcessServerStatusReq(pDnode, pMsg); - return; - } - - if (msgType == TDMT_DND_NET_TEST) { - dTrace("net test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); - dmProcessNetTestReq(pDnode, pMsg); - return; + switch (msgType) { + case TDMT_DND_SERVER_STATUS: + dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); + dmProcessServerStatusReq(pDnode, pMsg); + return; + case TDMT_DND_NET_TEST: + dTrace("net test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); + dmProcessNetTestReq(pDnode, pMsg); + return; + case TDMT_MND_SYSTABLE_RETRIEVE_RSP: + case TDMT_VND_FETCH_RSP: + dTrace("retrieve rsp is received"); + qWorkerProcessFetchRsp(NULL, NULL, pMsg); + pMsg->pCont = NULL; // already freed in qworker + return; } if (pDnode->status != DND_STAT_RUNNING) { @@ -233,16 +240,6 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { return 0; } -static inline int32_t dmSendRpcReq(SDnode *pDnode, const SEpSet *pEpSet, SRpcMsg *pReq) { - if (pDnode->trans.clientRpc == NULL) { - terrno = TSDB_CODE_NODE_OFFLINE; - return -1; - } - - rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL); - return 0; -} - static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { SEpSet epSet = {0}; dmGetMnodeEpSet(pDnode, &epSet); @@ -288,28 +285,20 @@ void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) { } static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { - if (pWrapper->pDnode->status != DND_STAT_RUNNING) { + SDnode *pDnode = pWrapper->pDnode; + if (pDnode->status != DND_STAT_RUNNING) { terrno = TSDB_CODE_NODE_OFFLINE; dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle); return -1; } - if (pWrapper->procType != DND_PROC_CHILD) { - return dmSendRpcReq(pWrapper->pDnode, pEpSet, pReq); - } else { - char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet)); - if (pHead == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - memcpy(pHead, pReq, sizeof(SRpcMsg)); - memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet)); - taosProcPutToParentQ(pWrapper->procObj, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen, - PROC_FUNC_REQ); - taosMemoryFree(pHead); - return 0; + if (pDnode->trans.clientRpc == NULL) { + terrno = TSDB_CODE_NODE_OFFLINE; + return -1; } + + rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL); + return 0; } static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { @@ -396,9 +385,10 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t pMsg->pCont = pCont; if (ftype == PROC_FUNC_REQ) { + ASSERT(1); dTrace("msg:%p, get from parent queue, send req:%s handle:%p code:0x%04x, app:%p", pMsg, TMSG_INFO(pMsg->msgType), pMsg->handle, code, pMsg->ahandle); - dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); + dmSendReq(pWrapper, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); } else if (ftype == PROC_FUNC_RSP) { dTrace("msg:%p, get from parent queue, rsp handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code, pMsg->ahandle); pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); @@ -421,23 +411,25 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t } SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper) { - SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dmConsumeChildQueue, - .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, - .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .parentConsumeFp = (ProcConsumeFp)dmConsumeParentQueue, - .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, - .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .shm = pWrapper->procShm, - .parent = pWrapper, - .name = pWrapper->name}; + SProcCfg cfg = { + .childConsumeFp = (ProcConsumeFp)dmConsumeChildQueue, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)dmConsumeParentQueue, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = pWrapper->procShm, + .parent = pWrapper, + .name = pWrapper->name, + }; return cfg; } -bool rpcRfp(int32_t code) { +static bool rpcRfp(int32_t code) { if (code == TSDB_CODE_RPC_REDIRECT) { return true; } else { @@ -445,7 +437,7 @@ bool rpcRfp(int32_t code) { } } -static int32_t dmInitClient(SDnode *pDnode) { +int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; @@ -471,11 +463,15 @@ static int32_t dmInitClient(SDnode *pDnode) { return -1; } + pDnode->data.msgCb = dmGetMsgcb(&pDnode->wrappers[DNODE]); + tmsgSetDefaultMsgCb(&pDnode->data.msgCb); + dDebug("dnode rpc client is initialized"); + return 0; } -static void dmCleanupClient(SDnode *pDnode) { +void dmCleanupClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; if (pTrans->clientRpc) { rpcClose(pTrans->clientRpc); @@ -517,7 +513,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void * pReq = rpcMallocCont(contLen); + void *pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; @@ -543,7 +539,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s return rpcRsp.code; } -static int32_t dmInitServer(SDnode *pDnode) { +int32_t dmInitServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; @@ -569,7 +565,7 @@ static int32_t dmInitServer(SDnode *pDnode) { return 0; } -static void dmCleanupServer(SDnode *pDnode) { +void dmCleanupServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; if (pTrans->serverRpc) { rpcClose(pTrans->serverRpc); @@ -578,17 +574,6 @@ static void dmCleanupServer(SDnode *pDnode) { } } -int32_t dmInitTrans(SDnode *pDnode) { - if (dmInitServer(pDnode) != 0) return -1; - if (dmInitClient(pDnode) != 0) return -1; - return 0; -} - -void dmCleanupTrans(SDnode *pDnode) { - dmCleanupServer(pDnode); - dmCleanupClient(pDnode); -} - SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { SMsgCb msgCb = { .sendReqFp = dmSendReq, @@ -598,6 +583,7 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { .releaseHandleFp = dmReleaseHandle, .reportStartupFp = dmReportStartupByWrapper, .pWrapper = pWrapper, + .clientRpc = pWrapper->pDnode->trans.clientRpc, }; return msgCb; } diff --git a/source/dnode/mgmt/interface/src/dmInt.c b/source/dnode/mgmt/interface/src/dmInt.c index 13a78ef52b..f8e23ad262 100644 --- a/source/dnode/mgmt/interface/src/dmInt.c +++ b/source/dnode/mgmt/interface/src/dmInt.c @@ -171,16 +171,17 @@ static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) { } } -void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pRpc) { +void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) { dDebug("net test req is received"); - SRpcMsg rsp = {.handle = pRpc->handle, .refId = pRpc->refId, .ahandle = pRpc->ahandle, .code = 0}; - rsp.pCont = rpcMallocCont(pRpc->contLen); + SRpcMsg rsp = {.handle = pReq->handle, .refId = pReq->refId, .ahandle = pReq->ahandle, .code = 0}; + rsp.pCont = rpcMallocCont(pReq->contLen); if (rsp.pCont == NULL) { rsp.code = TSDB_CODE_OUT_OF_MEMORY; } else { - rsp.contLen = pRpc->contLen; + rsp.contLen = pReq->contLen; } rpcSendResponse(&rsp); + rpcFreeCont(pReq->pCont); } void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) { @@ -208,6 +209,7 @@ void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) { _OVER: rpcSendResponse(&rspMsg); + rpcFreeCont(pReq->pCont); } void dmGetMonitorSysInfo(SMonSysInfo *pInfo) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4dcabcbc19..98559f974d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -320,7 +320,6 @@ typedef struct SExchangeInfo { SArray* pSourceDataInfo; tsem_t ready; void* pTransporter; - SMsgCb* pMsgCb; SSDataBlock* pResult; bool seqLoadData; // sequential load data or not, false by default int32_t current; @@ -661,8 +660,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); -SOperatorInfo* createExchangeOperatorInfo(SMsgCb *pMsgCb, const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); - SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d4fb14f6fc..ccfb3b9512 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3259,7 +3259,7 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { return TSDB_CODE_SUCCESS; } -SOperatorInfo* createExchangeOperatorInfo(SMsgCb* pMsgCb, const SNodeList* pSources, SSDataBlock* pBlock, +SOperatorInfo* createExchangeOperatorInfo(void *pTransporter, const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -3303,7 +3303,7 @@ SOperatorInfo* createExchangeOperatorInfo(SMsgCb* pMsgCb, const SNodeList* pSour pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL, NULL, NULL); - pInfo->pMsgCb = pMsgCb; + pInfo->pTransporter = pTransporter; return pOperator; @@ -4770,7 +4770,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); - return createExchangeOperatorInfo(pHandle->pMsgCb, pExchange->pSrcEndPoints, pResBlock, pTaskInfo); + return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 819340ec6c..eaacb561d5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1057,7 +1057,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { pMsgSendInfo->fp = loadSysTableCallback; int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb, NULL, &pInfo->epSet, &transporterId, pMsgSendInfo); + int32_t code = + asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo); tsem_wait(&pInfo->ready); if (pTaskInfo->code) { -- GitLab