From 36fe62fbd2684c22c55592dac419a1465b834e0a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 16 May 2022 23:23:49 +0800 Subject: [PATCH] refactor: make more object global --- include/common/tmsgcb.h | 38 +++++------ include/libs/transport/trpc.h | 2 +- source/common/src/tmsgcb.c | 39 ++++------- source/dnode/mgmt/mgmt_bnode/src/bmInt.c | 2 +- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 2 +- source/dnode/mgmt/mgmt_qnode/src/qmInt.c | 2 +- source/dnode/mgmt/mgmt_snode/src/smInt.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 2 +- source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 9 ++- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 14 +++- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 6 +- source/dnode/mgmt/node_mgmt/src/dmNodes.c | 4 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 65 ++++++++++--------- source/dnode/mnode/impl/src/mndDnode.c | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 2 +- source/dnode/mnode/impl/test/trans/trans2.cpp | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- source/libs/qworker/src/qworker.c | 10 +-- source/libs/qworker/src/qworkerMsg.c | 4 +- source/libs/qworker/test/qworkerTests.cpp | 16 ++--- source/libs/stream/src/tstream.c | 6 +- 21 files changed, 116 insertions(+), 115 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 7d5cabccb3..679e3ba775 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -22,9 +22,10 @@ extern "C" { #endif -typedef struct SRpcMsg SRpcMsg; -typedef struct SEpSet SEpSet; -typedef struct SMgmtWrapper SMgmtWrapper; +typedef struct SRpcMsg SRpcMsg; +typedef struct SEpSet SEpSet; +typedef struct SMgmtWrapper SMgmtWrapper; +typedef struct SRpcHandleInfo SRpcHandleInfo; typedef enum { QUERY_QUEUE, @@ -37,19 +38,17 @@ typedef enum { QUEUE_MAX, } EQueueType; -typedef int32_t (*PutToQueueFp)(void *pMgmt, SRpcMsg* pReq); -typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype); -typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); -typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); -typedef void (*SendRspFp)(const SRpcMsg* pRsp); -typedef void (*SendRedirectRspFp)(const SRpcMsg* pRsp, const SEpSet* pNewEpSet); -typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); -typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type); +typedef int32_t (*PutToQueueFp)(void* pMgmt, SRpcMsg* pMsg); +typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); +typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); +typedef void (*SendRspFp)(const SRpcMsg* pMsg); +typedef void (*SendRedirectRspFp)(const SRpcMsg* pMsg, const SEpSet* pNewEpSet); +typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg); +typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type); typedef void (*ReportStartup)(const char* name, const char* desc); typedef struct { - SMgmtWrapper* pWrapper; - void* pMgmt; + void* mgmt; void* clientRpc; PutToQueueFp queueFps[QUEUE_MAX]; GetQueueSizeFp qsizeFp; @@ -62,14 +61,13 @@ typedef struct { } SMsgCb; void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); -int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); +int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); -int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); -void tmsgSendRsp(SRpcMsg* pRsp); -void tmsgSendMnodeRecv(SRpcMsg* pReq, SRpcMsg* pRsp); -void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet); -void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); -void tmsgReleaseHandle(void* handle, int8_t type); +int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg); +void tmsgSendRsp(const SRpcMsg* pMsg); +void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet); +void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); +void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); void tmsgReportStartup(const char* name, const char* desc); #ifdef __cplusplus diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index d8cdcacdb1..b6864bd38d 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -36,7 +36,7 @@ typedef struct { char user[TSDB_USER_LEN]; } SRpcConnInfo; -typedef struct { +typedef struct SRpcHandleInfo { // rpc info void *handle; // rpc handle returned to app int64_t refId; // refid, used by server diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 43e0b87beb..d28e2c675d 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -22,51 +22,38 @@ static SMsgCb tsDefaultMsgCb; void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { - // cannot be empty, but not checked for faster detect PutToQueueFp fp = pMsgCb->queueFps[qtype]; - return (*fp)(pMsgCb->pMgmt, pReq); + return (*fp)(pMsgCb->mgmt, pReq); } int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { - // cannot be empty, but not checked for faster detect GetQueueSizeFp fp = pMsgCb->qsizeFp; - return (*fp)(pMsgCb->pMgmt, vgId, qtype); + return (*fp)(pMsgCb->mgmt, vgId, qtype); } -int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { - // cannot be empty, but not checked for faster detect - SendReqFp fp = pMsgCb->sendReqFp; - return (*fp)(pMsgCb->pWrapper, epSet, pReq); +int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pReq) { + SendReqFp fp = tsDefaultMsgCb.sendReqFp; + return (*fp)(epSet, pReq); } -void tmsgSendRsp(SRpcMsg* pMsg) { - // cannot be empty, but not checked for faster detect +void tmsgSendRsp(const SRpcMsg* pMsg) { SendRspFp fp = tsDefaultMsgCb.sendRspFp; return (*fp)(pMsg); } -void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet) { - // cannot be empty, but not checked for faster detect +void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet) { SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp; - (*fp)(pRsp, pNewEpSet); + (*fp)(pMsg, pNewEpSet); } -void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { - RegisterBrokenLinkArgFp fp = pMsgCb->registerBrokenLinkArgFp; - if (fp != NULL) { - (*fp)(pMsgCb->pWrapper, pMsg); - } else { - terrno = TSDB_CODE_INVALID_PTR; - } +void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { + RegisterBrokenLinkArgFp fp = tsDefaultMsgCb.registerBrokenLinkArgFp; + (*fp)(pMsg); } -void tmsgReleaseHandle(void* handle, int8_t type) { +void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp; - if (fp != NULL) { - (*fp)(tsDefaultMsgCb.pWrapper, handle, type); - } else { - terrno = TSDB_CODE_INVALID_PTR; - } + (*fp)(pHandle, type); } void tmsgReportStartup(const char* name, const char* desc) { diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c index 1fd3aab1d9..2c5d23cae9 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c @@ -43,7 +43,7 @@ int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->path = pInput->path; pMgmt->name = pInput->name; pMgmt->msgCb = pInput->msgCb; - pMgmt->msgCb.pMgmt = pMgmt; + pMgmt->msgCb.mgmt = pMgmt; SBnodeOpt option = {0}; bmInitOption(pMgmt, &option); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 8445889954..a24334550b 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -136,7 +136,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; - pMgmt->msgCb.pMgmt = pMgmt; + pMgmt->msgCb.mgmt = pMgmt; bool deployed = false; if (mmReadFile(pMgmt, &deployed) != 0) { diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c index a40f95041b..06c18ab288 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c @@ -46,7 +46,7 @@ static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)qmGetQueueSize; - pMgmt->msgCb.pMgmt = pMgmt; + pMgmt->msgCb.mgmt = pMgmt; SQnodeOpt option = {0}; qmInitOption(pMgmt, &option); diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 25d632d565..971a6ac4c7 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -44,7 +44,7 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->path = pInput->path; pMgmt->name = pInput->name; pMgmt->msgCb = pInput->msgCb; - pMgmt->msgCb.pMgmt = pMgmt; + pMgmt->msgCb.mgmt = pMgmt; SSnodeOpt option = {0}; smInitOption(pMgmt, &option); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 287d49c4f5..ab41ee5df3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -258,7 +258,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)vmPutRpcMsgToFetchQueue; pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; - pMgmt->msgCb.pMgmt = pMgmt; + pMgmt->msgCb.mgmt = pMgmt; taosInitRWLatch(&pMgmt->latch); SDiskCfg dCfg = {0}; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 51b518653f..431c52ef95 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -65,7 +65,6 @@ typedef struct { } SProc; typedef struct SMgmtWrapper { - struct SDnode *pDnode; SMgmtFunc func; void *pMgmt; const char *name; @@ -124,8 +123,12 @@ typedef struct SDnode { SMgmtWrapper wrappers[NODE_END]; } SDnode; -// dmEmv.c -void dmReportStartup(const char *pName, const char *pDesc); +// dmEnv.c +SDnode *dmInstance(); +bool dmNotRunning(); +void dmReportStartup(const char *pName, const char *pDesc); +void *dmGetClientRpc(); +void dmGetMnodeEpSetGlobal(SEpSet *pEpSet); // dmMgmt.c int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index d507340950..cc2c7fa815 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" -static SDnode global = {0}; +SDnode global = {0}; static int32_t dmCheckRepeatInit(SDnode *pDnode) { if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { @@ -166,10 +166,12 @@ static bool dmIsNodeRequired(EDndNodeType ntype) { } SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { + SDnode *pDnode = dmInstance(); + SMgmtInputOpt opt = { .path = pWrapper->path, .name = pWrapper->name, - .pData = &pWrapper->pDnode->data, + .pData = &pDnode->data, .processCreateNodeFp = dmProcessCreateNodeReq, .processDropNodeFp = dmProcessDropNodeReq, .isNodeRequiredFp = dmIsNodeRequired, @@ -185,3 +187,11 @@ void dmReportStartup(const char *pName, const char *pDesc) { tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); dDebug("step:%s, %s", pStartup->name, pStartup->desc); } + +SDnode *dmInstance() { return &global; } + +bool dmNotRunning() { return global.status != DND_STAT_RUNNING; } + +void *dmGetClientRpc() { return global.trans.clientRpc; } + +void dmGetMnodeEpSetGlobal(SEpSet *pEpSet) { dmGetMnodeEpSet(&global.data, pEpSet); } \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index b8cb147ac4..a3de98b7ee 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" -static bool dmRequireNode(SMgmtWrapper *pWrapper) { +static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); bool required = false; @@ -25,7 +25,7 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) { dDebug("node:%s, does not require startup", pWrapper->name); } - if (pWrapper->ntype == DNODE && pWrapper->pDnode->rtype != DNODE && pWrapper->pDnode->rtype != NODE_END) { + if (pWrapper->ntype == DNODE && pDnode->rtype != DNODE && pDnode->rtype != NODE_END) { required = false; dDebug("node:%s, does not require startup in child process", pWrapper->name); } @@ -150,7 +150,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { goto _OVER; } - pWrapper->required = dmRequireNode(pWrapper); + pWrapper->required = dmRequireNode(pDnode, pWrapper); if (ntype != DNODE && dmReadShmFile(pWrapper->path, pWrapper->name, pDnode->rtype, &pWrapper->proc.shm) != 0) { dError("node:%s, failed to read shm file since %s", pWrapper->name, terrstr()); diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index 10b6db52c0..d8bb8126e3 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -64,6 +64,8 @@ static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) { } int32_t dmOpenNode(SMgmtWrapper *pWrapper) { + SDnode *pDnode = dmInstance(); + if (taosMkDir(pWrapper->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr()); @@ -101,7 +103,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { dError("node:%s, failed to init proc since %s", pWrapper->name, terrstr()); return -1; } - if (pWrapper->pDnode->rtype == NODE_END) { + if (pDnode->rtype == NODE_END) { dInfo("node:%s, should be started manually in child process", pWrapper->name); } else { if (dmNewProc(pWrapper, pWrapper->ntype) != 0) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 4a32bc710a..3bf9993c99 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -194,9 +194,9 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { return 0; } -static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { +static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) { SEpSet epSet = {0}; - dmGetMnodeEpSet(&pDnode->data, &epSet); + dmGetMnodeEpSetGlobal(&epSet); dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { @@ -221,15 +221,8 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { rpcSendResponse(&rsp); } -static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) { - if (pRsp->code == TSDB_CODE_NODE_REDIRECT) { - dmSendRpcRedirectRsp(pDnode, pRsp); - } else { - rpcSendResponse(pRsp); - } -} - -static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { +static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { + SDnode *pDnode = dmInstance(); if (pDnode->status != DND_STAT_RUNNING) { pRsp->code = TSDB_CODE_NODE_OFFLINE; rpcFreeCont(pReq->pCont); @@ -239,18 +232,18 @@ static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRp } } -static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { - SDnode *pDnode = pWrapper->pDnode; - if (pDnode->status != DND_STAT_RUNNING || pDnode->trans.clientRpc == NULL) { +static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) { + SDnode *pDnode = dmInstance(); + if (pDnode->status != DND_STAT_RUNNING) { rpcFreeCont(pReq->pCont); pReq->pCont = NULL; terrno = TSDB_CODE_NODE_OFFLINE; dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle); return -1; + } else { + rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL); + return 0; } - - rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL); - return 0; } static inline void dmSendRsp(const SRpcMsg *pMsg) { @@ -258,7 +251,11 @@ static inline void dmSendRsp(const SRpcMsg *pMsg) { if (InChildProc(pWrapper->proc.ptype)) { dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP); } else { - dmSendRpcRsp(pWrapper->pDnode, pMsg); + if (pMsg->code == TSDB_CODE_NODE_REDIRECT) { + dmSendRpcRedirectRsp(pMsg); + } else { + rpcSendResponse(pMsg); + } } } @@ -280,7 +277,9 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe } } -static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { +static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { + SMgmtWrapper *pWrapper = pMsg->info.wrapper; + if (InChildProc(pWrapper->proc.ptype)) { dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST); } else { @@ -288,12 +287,14 @@ static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg } } -static inline void dmReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) { +static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { + SMgmtWrapper *pWrapper = pHandle->wrapper; + if (InChildProc(pWrapper->proc.ptype)) { - SRpcMsg msg = {.info.handle = handle, .code = type}; + SRpcMsg msg = {.info = *pHandle, .code = type}; dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE); } else { - rpcReleaseHandle(handle, type); + rpcReleaseHandle(pHandle->handle, type); } } @@ -385,7 +386,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s SEpSet epSet = {0}; dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt); dmGetMnodeEpSet(&pDnode->data, &epSet); - dmSendRecv(pDnode, &epSet, &rpcMsg, &rpcRsp); + dmSendRecv(&epSet, &rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { terrno = rpcRsp.code; @@ -441,15 +442,15 @@ void dmCleanupServer(SDnode *pDnode) { } SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { - SMsgCb msgCb = { - .pWrapper = pWrapper, - .clientRpc = pWrapper->pDnode->trans.clientRpc, - .sendReqFp = dmSendReq, - .sendRspFp = dmSendRsp, - .sendRedirectRspFp = dmSendRedirectRsp, - .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, - .releaseHandleFp = dmReleaseHandle, - .reportStartupFp = dmReportStartup, + SDnode *pDnode = dmInstance(); + SMsgCb msgCb = { + .clientRpc = dmInstance()->trans.clientRpc, + .sendReqFp = dmSendReq, + .sendRspFp = dmSendRsp, + .sendRedirectRspFp = dmSendRedirectRsp, + .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, + .releaseHandleFp = dmReleaseHandle, + .reportStartupFp = dmReportStartup, }; return msgCb; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 20ac8da713..5fdd2f1842 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -625,7 +625,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info}; mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.info.ahandle, cfgReq.config); - tmsgSendReq(&pMnode->msgCb, &epSet, &rpcMsg); + tmsgSendReq(&epSet, &rpcMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 0bc9eb5378..7bd1dd80fb 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -985,7 +985,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr } memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); - if (tmsgSendReq(&pMnode->msgCb, &pAction->epSet, &rpcMsg) == 0) { + if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) { mDebug("trans:%d, action:%d is sent", pTrans->id, action); pAction->msgSent = 1; pAction->msgReceived = 0; diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index e33aecee8a..264a6ef633 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -47,7 +47,7 @@ class MndTestTrans2 : public ::testing::Test { static void InitMnode() { static SMsgCb msgCb = {0}; msgCb.reportStartupFp = reportStartup; - msgCb.pWrapper = (SMgmtWrapper *)(&msgCb); // hack + msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack tmsgSetDefaultMsgCb(&msgCb); SMnodeOpt opt = {0}; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 38ce9b88dc..a93844c5ff 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -73,7 +73,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) { SMsgCb *pMsgCb = rpcHandle; if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { pMsg->info.noResp = 1; - tmsgSendReq(rpcHandle, pEpSet, pMsg); + tmsgSendReq(pEpSet, pMsg); } else { vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 403a6a734f..adf3588bd1 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -412,7 +412,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { } void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { - tmsgReleaseHandle(ctx->ctrlConnInfo.handle, TAOS_CONN_SERVER); + tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); ctx->ctrlConnInfo.handle = NULL; ctx->ctrlConnInfo.refId = -1; @@ -1278,7 +1278,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re QW_LOCK(QW_WRITE, &sch->hbConnLock); if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) { - tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); + tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER); sch->hbConnInfo.handle = NULL; sch->hbConnInfo.ahandle = NULL; @@ -1310,7 +1310,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { QW_LOCK(QW_WRITE, &sch->hbConnLock); if (sch->hbConnInfo.handle) { - tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); + tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER); } memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); @@ -1330,7 +1330,7 @@ _return: qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); if (code) { - tmsgReleaseHandle(qwMsg->connInfo.handle, TAOS_CONN_SERVER); + tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); } QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); @@ -1498,7 +1498,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { } int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { - if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) { + if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) { qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index f7549de71d..72eead724e 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -294,7 +294,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { .info = *pConn, }; - tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); + tmsgRegisterBrokenLinkArg(&pMsg); return TSDB_CODE_SUCCESS; } @@ -328,7 +328,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * .info = *pConn, }; - tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); + tmsgRegisterBrokenLinkArg(&pMsg); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 5a57a47df8..b573828e76 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -959,7 +959,7 @@ TEST(seqTest, normalCase) { stubSetGetDataBlock(); SMsgCb msgCb = {0}; - msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.mgmt = (void *)mockPointer; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); @@ -1001,7 +1001,7 @@ TEST(seqTest, cancelFirst) { stubSetRpcSendResponse(); SMsgCb msgCb = {0}; - msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.mgmt = (void *)mockPointer; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); @@ -1050,7 +1050,7 @@ TEST(seqTest, randCase) { taosSeedRand(taosGetTimestampSec()); SMsgCb msgCb = {0}; - msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.mgmt = (void *)mockPointer; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); @@ -1124,7 +1124,7 @@ TEST(seqTest, multithreadRand) { taosSeedRand(taosGetTimestampSec()); SMsgCb msgCb = {0}; - msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.mgmt = (void *)mockPointer; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); @@ -1188,7 +1188,7 @@ TEST(rcTest, shortExecshortDelay) { qwtTestQuitThreadNum = 0; SMsgCb msgCb = {0}; - msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.mgmt = (void *)mockPointer; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); @@ -1272,7 +1272,7 @@ TEST(rcTest, longExecshortDelay) { qwtTestQuitThreadNum = 0; SMsgCb msgCb = {0}; - msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.mgmt = (void *)mockPointer; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); @@ -1358,7 +1358,7 @@ TEST(rcTest, shortExeclongDelay) { qwtTestQuitThreadNum = 0; SMsgCb msgCb = {0}; - msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.mgmt = (void *)mockPointer; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); @@ -1442,7 +1442,7 @@ TEST(rcTest, dropTest) { taosSeedRand(taosGetTimestampSec()); SMsgCb msgCb = {0}; - msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; + msgCb.mgmt = (void *)mockPointer; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); ASSERT_EQ(code, 0); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 743fbd0e9f..baeb404538 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -111,7 +111,7 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb ASSERT(0); return -1; } - tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); + tmsgSendReq(pEpSet, &dispatchMsg); } return 0; } @@ -371,7 +371,7 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat return -1; } - tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); + tmsgSendReq(pEpSet, &dispatchMsg); } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); @@ -571,7 +571,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in return -1; } - tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); + tmsgSendReq(pEpSet, &dispatchMsg); } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); -- GitLab