diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 431c52ef95af53338c80df97ea27fefc974c053c..f9baa85b6316d4bbb5a3eee96b2a533e51ecc9c4 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -66,6 +66,7 @@ typedef struct { typedef struct SMgmtWrapper { SMgmtFunc func; + struct SDnode *pDnode; void *pMgmt; const char *name; char *path; @@ -125,10 +126,7 @@ typedef struct SDnode { // dmEnv.c SDnode *dmInstance(); -bool dmNotRunning(); void dmReportStartup(const char *pName, const char *pDesc); -void *dmGetClientRpc(); -void dmGetMnodeEpSetGlobal(SEpSet *pEpSet); // dmMgmt.c int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype); @@ -164,7 +162,7 @@ int32_t dmInitServer(SDnode *pDnode); void dmCleanupServer(SDnode *pDnode); int32_t dmInitClient(SDnode *pDnode); void dmCleanupClient(SDnode *pDnode); -SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper); +SMsgCb dmGetMsgcb(SDnode *pDnode); int32_t dmInitMsgHandle(SDnode *pDnode); int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index cc2c7fa8152e1fa748b397dd78c92ff7d43c9267..8726fea9a0b9d738e79a70a477f900f75cb30ca8 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -16,7 +16,9 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" -SDnode global = {0}; +static SDnode global = {0}; + +SDnode *dmInstance() { return &global; } static int32_t dmCheckRepeatInit(SDnode *pDnode) { if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { @@ -49,10 +51,10 @@ static int32_t dmInitMonitor() { int32_t dmInit(int8_t rtype) { dInfo("start to init env"); - if (dmCheckRepeatInit(&global) != 0) return -1; + if (dmCheckRepeatInit(dmInstance()) != 0) return -1; if (dmInitSystem() != 0) return -1; if (dmInitMonitor() != 0) return -1; - if (dmInitDnode(&global, rtype) != 0) return -1; + if (dmInitDnode(dmInstance(), rtype) != 0) return -1; dInfo("env is initialized"); return 0; @@ -69,7 +71,7 @@ static int32_t dmCheckRepeatCleanup(SDnode *pDnode) { void dmCleanup() { dDebug("start to cleanup env"); if (dmCheckRepeatCleanup != 0) return; - dmCleanupDnode(&global); + dmCleanupDnode(dmInstance()); monCleanup(); syncCleanUp(); walCleanUp(); @@ -83,17 +85,17 @@ void dmCleanup() { } void dmStop() { - SDnode *pDnode = &global; + SDnode *pDnode = dmInstance(); pDnode->stop = true; } int32_t dmRun() { - SDnode *pDnode = &global; + SDnode *pDnode = dmInstance(); return dmRunDnode(pDnode); } static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { - SDnode *pDnode = &global; + SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); if (pWrapper != NULL) { @@ -130,7 +132,7 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { } static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { - SDnode *pDnode = &global; + SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); if (pWrapper == NULL) { @@ -161,37 +163,27 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { } static bool dmIsNodeRequired(EDndNodeType ntype) { - SDnode *pDnode = &global; + SDnode *pDnode = dmInstance(); return pDnode->wrappers[ntype].required; } SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { - SDnode *pDnode = dmInstance(); - SMgmtInputOpt opt = { .path = pWrapper->path, .name = pWrapper->name, - .pData = &pDnode->data, + .pData = &pWrapper->pDnode->data, .processCreateNodeFp = dmProcessCreateNodeReq, .processDropNodeFp = dmProcessDropNodeReq, .isNodeRequiredFp = dmIsNodeRequired, }; - opt.msgCb = dmGetMsgcb(pWrapper); + opt.msgCb = dmGetMsgcb(pWrapper->pDnode); return opt; } void dmReportStartup(const char *pName, const char *pDesc) { - SStartupInfo *pStartup = &global.startup; + SStartupInfo *pStartup = &(dmInstance()->startup); tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); dDebug("step:%s, %s", pStartup->name, pStartup->desc); } - -SDnode *dmInstance() { return &global; } - -bool dmNotRunning() { return global.status != DND_STAT_RUNNING; } - -void *dmGetClientRpc() { return global.trans.clientRpc; } - -void dmGetMnodeEpSetGlobal(SEpSet *pEpSet) { dmGetMnodeEpSet(&global.data, pEpSet); } \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index a3de98b7eef9f5b5fa333ddd660dd649582c44aa..96a9b67c90f7eb4d48eafb4ce18cb1ddff39f52c 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -132,6 +132,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; + pWrapper->pDnode = pDnode; pWrapper->name = dmNodeName(ntype); pWrapper->ntype = ntype; pWrapper->proc.wrapper = pWrapper; diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index d8bb8126e3ceace6dff8962b92f745354e32f000..2bc5819df265f0f7511932af5f257ab1f078db18 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -64,7 +64,7 @@ static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) { } int32_t dmOpenNode(SMgmtWrapper *pWrapper) { - SDnode *pDnode = dmInstance(); + SDnode *pDnode = pWrapper->pDnode; if (taosMkDir(pWrapper->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 3bf9993c99865db65e7f001000bfde128e1684cc..414e00f7b4de58c558e3b37eeeaf8687ab462ef7 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -195,8 +195,9 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { } static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) { - SEpSet epSet = {0}; - dmGetMnodeEpSetGlobal(&epSet); + SDnode *pDnode = dmInstance(); + SEpSet epSet = {0}; + dmGetMnodeEpSet(&pDnode->data, &epSet); dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { @@ -279,7 +280,6 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = pMsg->info.wrapper; - if (InChildProc(pWrapper->proc.ptype)) { dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST); } else { @@ -289,22 +289,15 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { SMgmtWrapper *pWrapper = pHandle->wrapper; - if (InChildProc(pWrapper->proc.ptype)) { - SRpcMsg msg = {.info = *pHandle, .code = type}; + SRpcMsg msg = {.code = type, .info = *pHandle}; dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE); } else { rpcReleaseHandle(pHandle->handle, type); } } -static bool rpcRfp(int32_t code) { - if (code == TSDB_CODE_RPC_REDIRECT) { - return true; - } else { - return false; - } -} +static bool rpcRfp(int32_t code) { return code == TSDB_CODE_RPC_REDIRECT; } int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -345,8 +338,7 @@ void dmCleanupClient(SDnode *pDnode) { } } -static inline int32_t dmGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, - char *ckey) { +static inline int32_t dmGetHideUserAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t code = 0; char pass[TSDB_PASSWORD_LEN + 1] = {0}; @@ -370,7 +362,7 @@ static inline int32_t dmGetHideUserAuth(SDnode *pDnode, char *user, char *spi, c static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - if (dmGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) { + if (dmGetHideUserAuth(user, spi, encrypt, secret, ckey) == 0) { dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); return 0; } @@ -410,7 +402,6 @@ int32_t dmInitServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn)); rpcInit.localPort = tsServerPort; rpcInit.label = "DND"; @@ -441,16 +432,15 @@ void dmCleanupServer(SDnode *pDnode) { } } -SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { - SDnode *pDnode = dmInstance(); - SMsgCb msgCb = { - .clientRpc = dmInstance()->trans.clientRpc, - .sendReqFp = dmSendReq, - .sendRspFp = dmSendRsp, - .sendRedirectRspFp = dmSendRedirectRsp, - .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, - .releaseHandleFp = dmReleaseHandle, - .reportStartupFp = dmReportStartup, +SMsgCb dmGetMsgcb(SDnode *pDnode) { + SMsgCb msgCb = { + .clientRpc = pDnode->trans.clientRpc, + .sendReqFp = dmSendReq, + .sendRspFp = dmSendRsp, + .sendRedirectRspFp = dmSendRedirectRsp, + .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, + .releaseHandleFp = dmReleaseHandle, + .reportStartupFp = dmReportStartup, }; return msgCb; }