From 823691b6b24d28b585d324861f115084abe38acb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 12 May 2022 21:25:32 +0800 Subject: [PATCH] refactor: node mgmt --- source/dnode/mgmt/mgmt_bnode/inc/bmInt.h | 2 +- source/dnode/mgmt/mgmt_bnode/src/bmHandle.c | 10 +++--- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 4 +-- source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 1 + source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 36 +++++++++---------- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 2 +- source/dnode/mgmt/mgmt_mnode/src/mmFile.c | 28 +++++++-------- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 12 ++++--- source/dnode/mgmt/mgmt_qnode/inc/qmInt.h | 2 +- source/dnode/mgmt/mgmt_qnode/src/qmHandle.c | 8 ++--- source/dnode/mgmt/mgmt_snode/inc/smInt.h | 2 +- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 8 ++--- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 9 +++-- source/dnode/mgmt/node_mgmt/src/dmRun.c | 1 + source/dnode/mgmt/node_mgmt/src/dmTransport.c | 16 ++++++--- source/dnode/mgmt/node_util/inc/dmUtil.h | 11 +++--- 16 files changed, 85 insertions(+), 67 deletions(-) diff --git a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h index 0ae0e7ef70..1923d049a5 100644 --- a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h +++ b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h @@ -36,7 +36,7 @@ typedef struct SBnodeMgmt { // bmHandle.c SArray *bmGetMsgHandles(); -int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg); int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq); diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c index 86872ec2df..d87f832262 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c @@ -43,7 +43,7 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq) { return 0; } -int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDCreateBnodeReq createReq = {0}; @@ -52,14 +52,14 @@ int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - if (pMgmt->dnodeId != 0 && createReq.dnodeId != pMgmt->dnodeId) { + if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pMgmt->dnodeId); + dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pInput->dnodeId); return -1; } bool deployed = true; - if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) { + if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) { dError("failed to write bnode file since %s", terrstr()); return -1; } @@ -76,7 +76,7 @@ int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - if (dropReq.dnodeId != pMgmt->dnodeId) { + if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop bnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 95a01c4afb..e0d37db254 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -27,8 +27,8 @@ typedef struct SDnodeMgmt { SMsgCb msgCb; const char *path; const char *name; - TdThread *statusThreadId; - TdThread *monitorThreadId; + TdThread statusThread; + TdThread monitorThread; SSingleWorker mgmtWorker; ProcessCreateNodeFp processCreateNodeFp; ProcessDropNodeFp processDropNodeFp; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 7eabf9f1b7..9e40b3d022 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -27,6 +27,7 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { } static void dmStopMgmt(SDnodeMgmt *pMgmt) { + pMgmt->data.stopped = true; dmStopMonitorThread(pMgmt); dmStopStatusThread(pMgmt); } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 98b2d8b54b..ebbf6c678b 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -24,8 +24,7 @@ static void *dmStatusThreadFp(void *param) { while (1) { taosMsleep(200); - taosThreadTestCancel(); - if (pMgmt->data.dropped) continue; + if (pMgmt->data.dropped || pMgmt->data.stopped) break; int64_t curTime = taosGetTimestampMs(); float interval = (curTime - lastTime) / 1000.0f; @@ -46,8 +45,7 @@ static void *dmMonitorThreadFp(void *param) { while (1) { taosMsleep(200); - taosThreadTestCancel(); - if (pMgmt->data.dropped) continue; + if (pMgmt->data.dropped || pMgmt->data.stopped) break; int64_t curTime = taosGetTimestampMs(); float interval = (curTime - lastTime) / 1000.0f; @@ -61,40 +59,42 @@ static void *dmMonitorThreadFp(void *param) { } int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { - pMgmt->statusThreadId = taosCreateThread(dmStatusThreadFp, pMgmt); - if (pMgmt->statusThreadId == NULL) { - dError("failed to init dnode status thread"); - terrno = TSDB_CODE_OUT_OF_MEMORY; + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) { + dError("failed to create status thread since %s", strerror(errno)); return -1; } + taosThreadAttrDestroy(&thAttr); tmsgReportStartup("dnode-status", "initialized"); return 0; } void dmStopStatusThread(SDnodeMgmt *pMgmt) { - if (pMgmt->statusThreadId != NULL) { - taosDestoryThread(pMgmt->statusThreadId); - pMgmt->statusThreadId = NULL; + if (taosCheckPthreadValid(pMgmt->statusThread)) { + taosThreadJoin(pMgmt->statusThread, NULL); } } int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) { - pMgmt->monitorThreadId = taosCreateThread(dmMonitorThreadFp, pMgmt); - if (pMgmt->monitorThreadId == NULL) { - dError("failed to init dnode monitor thread"); - terrno = TSDB_CODE_OUT_OF_MEMORY; + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) { + dError("failed to create monitor thread since %s", strerror(errno)); return -1; } + taosThreadAttrDestroy(&thAttr); tmsgReportStartup("dnode-monitor", "initialized"); return 0; } void dmStopMonitorThread(SDnodeMgmt *pMgmt) { - if (pMgmt->monitorThreadId != NULL) { - taosDestoryThread(pMgmt->monitorThreadId); - pMgmt->monitorThreadId = NULL; + if (taosCheckPthreadValid(pMgmt->monitorThread)) { + taosThreadJoin(pMgmt->monitorThread, NULL); } } diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 4b34406b77..87dbe702be 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -48,7 +48,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq); // mmHandle.c SArray *mmGetMsgHandles(); -int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg); int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index bf3606a4c3..df377fefe7 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -124,21 +124,19 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); - if (pReq != NULL || pMgmt != NULL) { - int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica); - for (int32_t i = 0; i < replica; ++i) { - SReplica *pReplica = &pMgmt->replicas[i]; - if (pReq != NULL) { - pReplica = &pReq->replicas[i]; - } - len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); - if (i < replica - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }],\n"); - } + int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica); + for (int32_t i = 0; i < replica; ++i) { + SReplica *pReplica = &pMgmt->replicas[i]; + if (pReq != NULL) { + pReplica = &pReq->replicas[i]; + } + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); + if (i < replica - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }],\n"); } } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 6ee397d4e5..5548a23c55 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -72,7 +72,7 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) { return 0; } -int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDCreateMnodeReq createReq = {0}; @@ -81,14 +81,18 @@ int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - if (createReq.replica <= 1 || createReq.dnodeId != pMgmt->dnodeId) { + if (createReq.replica <= 1 || (createReq.dnodeId != pInput->dnodeId && pInput->dnodeId != 0)) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create mnode since %s", terrstr()); return -1; } bool deployed = true; - if (mmWriteFile(pMgmt, &createReq, deployed) != 0) { + + SMnodeMgmt mgmt = {0}; + mgmt.path = pInput->path; + mgmt.name = pInput->name; + if (mmWriteFile(&mgmt, &createReq, deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); return -1; } @@ -105,7 +109,7 @@ int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - if (dropReq.dnodeId != pMgmt->dnodeId) { + if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop mnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h index 0213cdb61e..8b48113dd3 100644 --- a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h +++ b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h @@ -37,7 +37,7 @@ typedef struct SQnodeMgmt { // qmHandle.c SArray *qmGetMsgHandles(); -int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg); int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index ae95c4c7c1..c500176b15 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -43,7 +43,7 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq) { return 0; } -int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDCreateQnodeReq createReq = {0}; @@ -52,14 +52,14 @@ int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - if (createReq.dnodeId != pMgmt->dnodeId) { + if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create qnode since %s", terrstr()); return -1; } bool deployed = true; - if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) { + if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) { dError("failed to write qnode file since %s", terrstr()); return -1; } @@ -76,7 +76,7 @@ int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - if (dropReq.dnodeId != pMgmt->dnodeId) { + if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop qnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index ca1865c484..a1ab9ba077 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -39,7 +39,7 @@ typedef struct SSnodeMgmt { // smHandle.c SArray *smGetMsgHandles(); -int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg); int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 1cc2855548..b0f0b73cbc 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -43,7 +43,7 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq) { return 0; } -int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDCreateSnodeReq createReq = {0}; @@ -52,14 +52,14 @@ int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - if (createReq.dnodeId != pMgmt->dnodeId) { + if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create snode since %s", terrstr()); return -1; } bool deployed = true; - if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) { + if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) { dError("failed to write snode file since %s", terrstr()); return -1; } @@ -76,7 +76,7 @@ int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - if (dropReq.dnodeId != pMgmt->dnodeId) { + if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop snode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index e389329a20..88372909ac 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -320,7 +320,12 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs return -1; } - int32_t code = (*pWrapper->func.createFp)(pWrapper, pMsg); + SMgmtInputOpt *pInput = &pWrapper->pDnode->input; + pInput->name = pWrapper->name; + pInput->path = pWrapper->path; + pInput->msgCb = dmGetMsgcb(pWrapper); + + int32_t code = (*pWrapper->func.createFp)(pInput, pMsg); if (code != 0) { dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); } else { @@ -345,7 +350,7 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) taosThreadMutexLock(&pDnode->mutex); - int32_t code = (*pWrapper->func.dropFp)(pWrapper, pMsg); + int32_t code = (*pWrapper->func.dropFp)(pWrapper->pMgmt, pMsg); if (code != 0) { dError("node:%s, failed to drop since %s", pWrapper->name, terrstr()); } else { diff --git a/source/dnode/mgmt/node_mgmt/src/dmRun.c b/source/dnode/mgmt/node_mgmt/src/dmRun.c index fd551f3574..f467cd699a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmRun.c +++ b/source/dnode/mgmt/node_mgmt/src/dmRun.c @@ -181,6 +181,7 @@ int32_t dmStartNode(SMgmtWrapper *pWrapper) { void dmStopNode(SMgmtWrapper *pWrapper) { if (pWrapper->func.stopFp != NULL && pWrapper->pMgmt != NULL) { (*pWrapper->func.stopFp)(pWrapper->pMgmt); + dDebug("node:%s, has been stopped", pWrapper->name); } } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 99a1fce3a8..02961c109f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -257,13 +257,21 @@ static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) { } static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { - rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp); + if (pDnode->status != DND_STAT_RUNNING) { + pRsp->code = TSDB_CODE_NODE_OFFLINE; + } else { + rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp); + } } static inline void dmSendToMnodeRecv(SMgmtWrapper *pWrapper, SRpcMsg *pReq, SRpcMsg *pRsp) { - SEpSet epSet = {0}; - dmGetMnodeEpSet(pWrapper->pDnode, &epSet); - dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp); + if (pWrapper->pDnode->status != DND_STAT_RUNNING) { + pRsp->code = TSDB_CODE_NODE_OFFLINE; + } else { + SEpSet epSet = {0}; + dmGetMnodeEpSet(pWrapper->pDnode, &epSet); + dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp); + } } static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index b728719af6..11bf528e1e 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -54,10 +54,10 @@ extern "C" { typedef enum { DNODE = 0, - VNODE = 1, - QNODE = 2, - SNODE = 3, - MNODE = 4, + MNODE = 1, + VNODE = 2, + QNODE = 3, + SNODE = 4, BNODE = 5, NODE_END = 6, } EDndNodeType; @@ -117,7 +117,7 @@ typedef int32_t (*NodeOpenFp)(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutp typedef void (*NodeCloseFp)(void *pMgmt); typedef int32_t (*NodeStartFp)(void *pMgmt); typedef void (*NodeStopFp)(void *pMgmt); -typedef int32_t (*NodeCreateFp)(void *pMgmt, SNodeMsg *pMsg); +typedef int32_t (*NodeCreateFp)(const SMgmtInputOpt *pInput, SNodeMsg *pMsg); typedef int32_t (*NodeDropFp)(void *pMgmt, SNodeMsg *pMsg); typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required); typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle @@ -167,6 +167,7 @@ typedef struct { ESyncState vndState; ESyncState mndState; bool dropped; + bool stopped; SEpSet mnodeEps; SArray *dnodeEps; SHashObj *dnodeHash; -- GitLab