diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 43a879fd8d3347e13ad8db167ce8c797c8d51f3b..996e09863527fcb40d676e5c8c83e5fa821d3590 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2271,10 +2271,11 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p struct SRpcMsg; struct SEpSet; -typedef int32_t (*PutToQueueFp)(void *pMgmt, struct SRpcMsg *pReq); -typedef int32_t (*SendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef int32_t (*SendMnodeReqFp)(void *pMgmt, struct SRpcMsg *rpcMsg); -typedef int32_t (*SendRspFp)(void *pMgmt, struct SRpcMsg *rpcMsg); +struct SMgmtWrapper; +typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); +typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* rpcMsg); +typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg); +typedef int32_t (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg); #ifdef __cplusplus } diff --git a/include/dnode/bnode/bnode.h b/include/dnode/bnode/bnode.h index 7d47a5ddf7e832683d03a0fb5a9a381af5e18ed3..4f2a9c85006d7f92237ec14f9bcd59e8c505d9a2 100644 --- a/include/dnode/bnode/bnode.h +++ b/include/dnode/bnode/bnode.h @@ -21,24 +21,21 @@ extern "C" { #endif /* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SDnode SDnode; -typedef struct SBnode SBnode; -typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg); -typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg); -typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg); +typedef struct SMgmtWrapper SMgmtWrapper; +typedef struct SBnode SBnode; typedef struct { int64_t numOfErrors; } SBnodeLoad; typedef struct { - int32_t sver; - int32_t dnodeId; - int64_t clusterId; - SDnode *pDnode; - SendReqToDnodeFp sendReqFp; - SendReqToMnodeFp sendReqToMnodeFp; - SendRedirectRspFp sendRedirectRspFp; + int32_t sver; + int32_t dnodeId; + int64_t clusterId; + SMgmtWrapper *pWrapper; + SendReqFp sendReqFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; } SBnodeOpt; /* ------------------------ SBnode ------------------------ */ diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 150f260841f8fb6a8fdea35d7ba066df3ceaf61a..9f242f73933b2f9c30a7bf9668ca461545d36a54 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -23,13 +23,8 @@ extern "C" { #endif /* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SDnode SDnode; -typedef struct SMnode SMnode; -typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef struct SMgmtWrapper SMgmtWrapper; +typedef struct SMnode SMnode; typedef struct { int32_t dnodeId; @@ -37,11 +32,12 @@ typedef struct { int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; - SDnode *pDnode; - PutToQueueFp putReqToMWriteQFp; - PutToQueueFp putReqToMReadQFp; + SMgmtWrapper *pWrapper; + PutToQueueFp putToWriteQFp; + PutToQueueFp putToReadQFp; SendReqFp sendReqFp; - SendMnodeReqFp sendReqToMnodeFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; } SMnodeOpt; /* ------------------------ SMnode ------------------------ */ diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 1ab27bb1ebe04376bce1d297eeb78c86edf2553e..69297270b9b130d0a3c1d2b42636b8f9fde2612e 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -21,11 +21,8 @@ extern "C" { #endif /* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SDnode SDnode; -typedef struct SQnode SQnode; -typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg); -typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg); -typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg); +typedef struct SMgmtWrapper SMgmtWrapper; +typedef struct SQnode SQnode; typedef struct { int64_t numOfStartTask; @@ -39,13 +36,13 @@ typedef struct { } SQnodeLoad; typedef struct { - int32_t sver; - int32_t dnodeId; - int64_t clusterId; - SDnode *pDnode; - SendReqToDnodeFp sendReqFp; - SendReqToMnodeFp sendReqToMnodeFp; - SendRedirectRspFp sendRedirectRspFp; + int32_t sver; + int32_t dnodeId; + int64_t clusterId; + SMgmtWrapper *pWrapper; + SendReqFp sendReqFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; } SQnodeOpt; /* ------------------------ SQnode ------------------------ */ diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 0e82b8cbf5bc721068288d354f80c2d424dcac39..302ffd62c0ef9e710f8b5cca50031f91e4fb2c0f 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -24,24 +24,21 @@ extern "C" { #endif /* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SDnode SDnode; -typedef struct SSnode SSnode; -typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg); -typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg); -typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg); +typedef struct SMgmtWrapper SMgmtWrapper; +typedef struct SSnode SSnode; typedef struct { int64_t numOfErrors; } SSnodeLoad; typedef struct { - int32_t sver; - int32_t dnodeId; - int64_t clusterId; - SDnode *pDnode; - SendReqToDnodeFp sendReqFp; - SendReqToMnodeFp sendReqToMnodeFp; - SendRedirectRspFp sendRedirectRspFp; + int32_t sver; + int32_t dnodeId; + int64_t clusterId; + SMgmtWrapper *pWrapper; + SendReqFp sendReqFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; } SSnodeOpt; /* ------------------------ SSnode ------------------------ */ diff --git a/source/dnode/mgmt/bnode/src/bmMgmt.c b/source/dnode/mgmt/bnode/src/bmMgmt.c index 6bae56939f0d188a4bc77b83139e9dfb681dec23..e6cca1b02ce45493a1d21f1612238f527d92d889 100644 --- a/source/dnode/mgmt/bnode/src/bmMgmt.c +++ b/source/dnode/mgmt/bnode/src/bmMgmt.c @@ -177,7 +177,7 @@ static void dndStopBnodeWorker(SDnode *pDnode) { static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { pOption->pDnode = pDnode; pOption->sendReqFp = dndSendReqToDnode; - pOption->sendReqToMnodeFp = dndSendReqToMnode; + pOption->sendMnodeReqFp = dndSendReqToMnode; pOption->sendRedirectRspFp = dmSendRedirectRsp; pOption->dnodeId = pDnode->dnodeId; pOption->clusterId = pDnode->clusterId; diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index db334206315aa9ad6ba9494f202b3bd4b2a4fcd7..27f0886e3a8e96c00a145a0039339988431371aa 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -149,9 +149,9 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndSendMonitorReport(SDnode *pDnode); -int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pMsg); -int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pMsg); -void dndSendRsp(void *pWrapper, SRpcMsg *pRsp); +int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); +void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, int32_t maxNum, void *queueFp); diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 91bf0eda870c0dd9103286180d595d61e8aaf45a..e3bd4d6219655a56f9f0454c0e1614151c61929a 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -279,9 +279,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { return 0; } -int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) { - SMgmtWrapper *pWrapper = wrapper; - +int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pReq) { if (pWrapper->procType == PROC_CHILD) { } else { STransMgmt *pTrans = &pWrapper->pDnode->trans; @@ -289,9 +287,7 @@ int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) { } } -int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pReq) { - SMgmtWrapper *pWrapper = wrapper; - +int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { if (pWrapper->procType == PROC_CHILD) { } else { SDnode *pDnode = pWrapper->pDnode; @@ -311,9 +307,7 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { } } -void dndSendRsp(void *wrapper, SRpcMsg *pRsp) { - SMgmtWrapper *pWrapper = wrapper; - +void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { if (pWrapper->procType == PROC_CHILD) { int32_t code = -1; do { diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index dbbc8f041177e2c5302946bdd7d0488592fe5873..8d56b48636c93a72d0cf62579ea9a77c1a8ce914 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -30,7 +30,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.updateTime = pMgmt->updateTime; req.numOfCores = tsNumOfCores; req.numOfSupportVnodes = pDnode->numOfSupportVnodes; - memcpy(req.dnodeEp, pDnode->localEp, TSDB_EP_LEN); + tstrncpy(req.dnodeEp, pDnode->localEp, TSDB_EP_LEN); req.clusterCfg.statusInterval = tsStatusInterval; req.clusterCfg.checkTime = 0; diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 2d79d560a08469b8db4d60b132184a24b3187f0b..2f7f3135969f6c690c7c020c365f6b399700426e 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -62,8 +62,8 @@ int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmPutMsgToWriteQueue(void *wrapper, SRpcMsg *pRpcMsg); -int32_t mmPutMsgToReadQueue(void *wrapper, SRpcMsg *pRpcMsg); +int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg); +int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index 75237aa912fc578a19a6d13859df813d5177c669..4305e913b21946ea80d14a511b9cb2a4ac126bbe 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -135,11 +135,11 @@ int32_t mmDrop(SMnodeMgmt *pMgmt) { static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { SDnode *pDnode = pMgmt->pDnode; - pOption->pDnode = pDnode; + pOption->pWrapper = pMgmt->pWrapper; pOption->sendReqFp = dndSendReqToDnode; - pOption->sendReqToMnodeFp = dndSendReqToMnode; - pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue; - pOption->putReqToMReadQFp = mmPutMsgToReadQueue; + pOption->sendMnodeReqFp = dndSendReqToMnode; + pOption->putToWriteQFp = mmPutMsgToWriteQueue; + pOption->putToReadQFp = mmPutMsgToReadQueue; pOption->dnodeId = pDnode->dnodeId; pOption->clusterId = pDnode->clusterId; } diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 68dc1c12596643bbe83f91d143d5198987e7cc66..e68faa88b42628c6234f235e0ed0f87a71ef63cd 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -17,7 +17,7 @@ #include "mmInt.h" static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { - dTrace("msg:%p, will be processed", pMsg); + dTrace("msg:%p, will be processed in mnode queue", pMsg); SMnode *pMnode = mmAcquire(pMgmt); SRpcMsg *pRpc = &pMsg->rpcMsg; bool isReq = (pRpc->msgType & 1U); @@ -102,7 +102,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } -static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker, SRpcMsg *pRpc) { +static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -111,7 +111,7 @@ static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker dTrace("msg:%p, is created", pMsg); pMsg->rpcMsg = *pRpc; - int32_t code = mmPutMsgToWorker(pWrapper->pMgmt, pWorker, pMsg); + int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); @@ -121,16 +121,12 @@ static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker return code; } -int32_t mmPutMsgToWriteQueue(void *wrapper, SRpcMsg *pRpc) { - // SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); - // SMnodeMgmt *pMgmt = pWrapper->pMgmt; - // return mmPutRpcMsgToWorker(pWrapper, &pMgmt->writeWorker, pRpc); - return 0; +int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutRpcMsgToWorker(pMgmt, &pMgmt->writeWorker, pRpc); } -int32_t mmPutMsgToReadQueue(void *wrapper, SRpcMsg *pRpc) { - // SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); - // SMnodeMgmt *pMgmt = pWrapper->pMgmt; - // return mmPutRpcMsgToWorker(pWrapper, &pMgmt->readWorker, pRpc); - return 0; +int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc); } diff --git a/source/dnode/mgmt/qnode/src/qmMgmt.c b/source/dnode/mgmt/qnode/src/qmMgmt.c index ddd0043e22e8b2a8bdd781b28db654f8c21dad42..e9edb28e3c24c47dd0406f8917a8c88e47244f7b 100644 --- a/source/dnode/mgmt/qnode/src/qmMgmt.c +++ b/source/dnode/mgmt/qnode/src/qmMgmt.c @@ -184,7 +184,7 @@ static void dndStopQnodeWorker(SDnode *pDnode) { static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { pOption->pDnode = pDnode; pOption->sendReqFp = dndSendReqToDnode; - pOption->sendReqToMnodeFp = dndSendReqToMnode; + pOption->sendMnodeReqFp = dndSendReqToMnode; pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->dnodeId = pDnode->dnodeId; pOption->clusterId = pDnode->clusterId; diff --git a/source/dnode/mgmt/snode/src/smMgmt.c b/source/dnode/mgmt/snode/src/smMgmt.c index 6af9e68ef4db97aeedfda9aeee3d77f3c37599e3..52152975a0010a01d86ed9a358887f1af66d269f 100644 --- a/source/dnode/mgmt/snode/src/smMgmt.c +++ b/source/dnode/mgmt/snode/src/smMgmt.c @@ -209,7 +209,7 @@ static void dndStopSnodeWorker(SDnode *pDnode) { static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { pOption->pDnode = pDnode; pOption->sendReqFp = dndSendReqToDnode; - pOption->sendReqToMnodeFp = dndSendReqToMnode; + pOption->sendMnodeReqFp = dndSendReqToMnode; pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->dnodeId = pDnode->dnodeId; pOption->clusterId = pDnode->clusterId; diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index 0869ba0d551dd5e7d618dd16bbbcbc013811fa93..2b6ffcb54472b04767245a9d9245627bd434b167 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -34,7 +34,6 @@ typedef struct SVnodesMgmt { SWWorkerPool syncPool; SWWorkerPool writePool; const char *path; - SMnode *pMnode; SDnode *pDnode; SMgmtWrapper *pWrapper; SDnodeWorker mgmtWorker; @@ -100,8 +99,8 @@ void vmStopWorker(SVnodesMgmt *pMgmt); int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); -int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmPutMsgToApplyQueue(SVnodesMgmt *pMgmt, int32_t vgId, SRpcMsg *pMsg); +int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pMsg); int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 0d78223dea6456691bbb1da14e1d835f1b6ef58e..bcdb821c7054d486254073e861a18f0ba7400654 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -257,10 +257,6 @@ static void vmCleanup(SMgmtWrapper *pWrapper) { dInfo("vnodes-mgmt is cleaned up"); } -static int32_t vmSendReqToDnode(SVnodesMgmt *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg) { - return dndSendReqToDnode(pMgmt->pDnode, epSet, rpcMsg); -} - static int32_t vmInit(SMgmtWrapper *pWrapper) { SDnode *pDnode = pWrapper->pDnode; SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt)); @@ -298,8 +294,8 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { } vnodeOpt.nthreads = tsNumOfCommitThreads; - vnodeOpt.putToQueryQFp = (VndPutToQueryQFp)vmPutMsgToQueryQueue; - vnodeOpt.sendReqFp = (VndSendReqFp)vmSendReqToDnode; + vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue; + vnodeOpt.sendReqFp = dndSendReqToDnode; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode since %s", terrstr()); goto _OVER; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index ee0a4b216e77f9de4b4badbef06c79142c7cd9b0..017c127a6d7b5d10462061a4241406037df17407 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -146,7 +146,9 @@ int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { } } -int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pRpc) { +int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + int32_t code = -1; SMsgHead *pHead = pRpc->pCont; // pHead->vgId = htonl(pHead->vgId); @@ -162,11 +164,20 @@ int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pRpc) { return code; } -int32_t vmPutMsgToApplyQueue(SVnodesMgmt *pMgmt, int32_t vgId, SRpcMsg *pMsg) { - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); +int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pRpc) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + + int32_t code = -1; + SMsgHead *pHead = pRpc->pCont; + // pHead->vgId = htonl(pHead->vgId); + + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; - int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg); + SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); + if (pMsg != NULL) { + code = vmWriteMsgToQueue(pVnode->pApplyQ, pMsg, false); + } vmReleaseVnode(pMgmt, pVnode); return code; } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 7b4a0233e2886feeb889e18735af3a5a536de921..88dd3a5c22c534d186264c555a7097dc8aaa8664 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -110,7 +110,7 @@ typedef struct SMnode { char *path; int64_t checkTime; SSdb *pSdb; - SDnode *pDnode; + SMgmtWrapper *pWrapper; SArray *pSteps; SShowMgmt showMgmt; SProfileMgmt profileMgmt; @@ -120,9 +120,9 @@ typedef struct SMnode { SGrantInfo grant; MndMsgFp msgFp[TDMT_MAX]; SendReqFp sendReqFp; - SendMnodeReqFp sendReqToMnodeFp; - PutToQueueFp putReqToMWriteQFp; - PutToQueueFp putReqToMReadQFp; + SendMnodeReqFp sendMnodeReqFp; + PutToQueueFp putToWriteQFp; + PutToQueueFp putToReadQFp; } SMnode; int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 26c72fe1a37987b3ea6cce83d6780902d61a23b9..25ec5f7cd470700f140d331f5b60ac362a11fcf1 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -420,7 +420,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg), }; - pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); + (*pMnode->putToWriteQFp)(pMnode->pWrapper, &rpcMsg); } else { taosHashCleanup(pRebMsg->rebSubHash); rpcFreeCont(pRebMsg); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index c0d7e4ad355ad16a2c364226b0ae4845d689eff0..22e797e1b843c1be339bfe3c5b148d164ec40b05 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -49,7 +49,7 @@ int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { return -1; } - return (*pMnode->sendReqFp)(pMnode->pDnode, pEpSet, pMsg); + return (*pMnode->sendReqFp)(pMnode->pWrapper, pEpSet, pMsg); } int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) { @@ -58,7 +58,7 @@ int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) { return -1; } - return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg); + return (*pMnode->sendMnodeReqFp)(pMnode->pWrapper, pMsg); } static void *mndBuildTimerMsg(int32_t *pContLen) { @@ -80,7 +80,7 @@ static void mndPullupTrans(void *param, void *tmrId) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; - pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); + pMnode->putToWriteQFp(pMnode->pWrapper, &rpcMsg); } taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer); @@ -96,7 +96,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { .pCont = pReq, .contLen = contLen, }; - pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); + pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg); } taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer); @@ -108,7 +108,7 @@ static void mndPullupTelem(void *param, void *tmrId) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; - pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); + pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg); } taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer); @@ -286,14 +286,14 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->replica = pOption->replica; pMnode->selfIndex = pOption->selfIndex; memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); - pMnode->pDnode = pOption->pDnode; - pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp; - pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp; + pMnode->pWrapper = pOption->pWrapper; + pMnode->putToWriteQFp = pOption->putToWriteQFp; + pMnode->putToReadQFp = pOption->putToReadQFp; pMnode->sendReqFp = pOption->sendReqFp; - pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp; + pMnode->sendMnodeReqFp = pOption->sendMnodeReqFp; - if (pMnode->sendReqFp == NULL || pMnode->sendReqToMnodeFp == NULL || - pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { + if (pMnode->sendReqFp == NULL || pMnode->sendMnodeReqFp == NULL || + pMnode->putToWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { terrno = TSDB_CODE_MND_INVALID_OPTIONS; return -1; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index fbd1b66e276f882d6720da9c5c280ae45c98cfbb..d318732bfc4427ef4000f53a86c82810058f8164 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -30,10 +30,8 @@ extern "C" { #endif /* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SVnode SVnode; -typedef int32_t (*VndPutToQueryQFp)(void *pMgmt, struct SRpcMsg *pReq); -typedef int32_t (*VndSendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); - +typedef struct SMgmtWrapper SMgmtWrapper; +typedef struct SVnode SVnode; typedef struct { // TODO int32_t reserved; @@ -62,9 +60,11 @@ typedef struct { } SVnodeCfg; typedef struct { - uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) - VndPutToQueryQFp putToQueryQFp; - VndSendReqFp sendReqFp; + uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) + PutToQueueFp putToQueryQFp; + SendReqFp sendReqFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; } SVnodeOpt; typedef struct { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 60ba0a3b71b61f0d2826344850377ffa22a44dd4..ebdc44ca6d4ce2e2b0e78a88cc6a069420a92ea3 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -53,8 +53,8 @@ typedef struct SVnodeMgr { pthread_cond_t hasTask; TD_DLIST(SVnodeTask) queue; // For vnode Mgmt - VndPutToQueryQFp putToQueryQFp; - VndSendReqFp sendReqFp; + PutToQueueFp putToQueryQFp; + SendReqFp sendReqFp; } SVnodeMgr; extern SVnodeMgr vnodeMgr;