From 3b54d90350fa5bc2838e861a087be34e347c24a9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 13 Apr 2022 21:00:43 +0800 Subject: [PATCH] refact(cluster): node mgmt --- include/dnode/mgmt/dnode.h | 2 +- source/dnode/mgmt/implement/src/dmExec.c | 309 +++++++++----------- source/dnode/mgmt/implement/src/dmHandle.c | 10 +- source/dnode/mgmt/implement/src/dmObj.c | 6 - source/dnode/mgmt/mgmt_bnode/inc/bmInt.h | 4 - source/dnode/mgmt/mgmt_bnode/src/bmHandle.c | 22 +- source/dnode/mgmt/mgmt_bnode/src/bmInt.c | 73 ++--- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 2 +- source/dnode/mgmt/mgmt_mnode/src/mmFile.c | 35 +-- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 21 +- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 20 +- source/dnode/mgmt/mgmt_qnode/inc/qmInt.h | 4 - source/dnode/mgmt/mgmt_qnode/src/qmHandle.c | 22 +- source/dnode/mgmt/mgmt_qnode/src/qmInt.c | 76 ++--- source/dnode/mgmt/mgmt_snode/inc/smInt.h | 4 - source/dnode/mgmt/mgmt_snode/src/smHandle.c | 22 +- source/dnode/mgmt/mgmt_snode/src/smInt.c | 73 +---- 17 files changed, 286 insertions(+), 419 deletions(-) diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index 352f6d5b53..b48fd23204 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -51,7 +51,7 @@ typedef struct { int8_t ntype; } SDnodeOpt; -typedef enum { DND_EVENT_START, DND_EVENT_STOP = 1, DND_EVENT_CHILD } EDndEvent; +typedef enum { DND_EVENT_START = 0, DND_EVENT_STOP = 1, DND_EVENT_CHILD = 2 } EDndEvent; /** * @brief Initialize and start the dnode. diff --git a/source/dnode/mgmt/implement/src/dmExec.c b/source/dnode/mgmt/implement/src/dmExec.c index be7a8f0dd2..7c5d049aff 100644 --- a/source/dnode/mgmt/implement/src/dmExec.c +++ b/source/dnode/mgmt/implement/src/dmExec.c @@ -99,40 +99,78 @@ static int32_t dmRunNodeProc(SMgmtWrapper *pWrapper) { return 0; } -static int32_t dmOpenNodeImp(SMgmtWrapper *pWrapper) { +int32_t dmOpenNode(SMgmtWrapper *pWrapper) { if (taosMkDir(pWrapper->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr()); return -1; } - if ((*pWrapper->fp.openFp)(pWrapper) != 0) { - dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); - return -1; + if (pWrapper->procType == DND_PROC_SINGLE || pWrapper->procType == DND_PROC_CHILD) { + if ((*pWrapper->fp.openFp)(pWrapper) != 0) { + dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); + return -1; + } + + dDebug("node:%s, has been opened", pWrapper->name); + pWrapper->deployed = true; + } else { + if (dmInitNodeProc(pWrapper) != 0) return -1; + if (dmWriteShmFile(pWrapper) != 0) return -1; + if (dmRunNodeProc(pWrapper) != 0) return -1; + } + + if (pWrapper->procType == DND_PROC_CHILD) { + SProcCfg cfg = dmGenProcCfg(pWrapper); + cfg.isChild = true; + pWrapper->procObj = taosProcInit(&cfg); + if (pWrapper->procObj == NULL) { + dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); + return -1; + } } - dDebug("node:%s, has been opened", pWrapper->name); - pWrapper->deployed = true; return 0; } -int32_t dmOpenNode(SMgmtWrapper *pWrapper) { - SDnode *pDnode = pWrapper->pDnode; - if (pDnode->ptype == DND_PROC_SINGLE) { - return dmOpenNodeImp(pWrapper); - } else if (pDnode->ptype == DND_PROC_PARENT) { - if (dmInitNodeProc(pWrapper) != 0) return -1; - if (dmWriteShmFile(pWrapper) != 0) return -1; - if (dmRunNodeProc(pWrapper) != 0) return -1; +int32_t dmStartNode(SMgmtWrapper *pWrapper) { + if (pWrapper->procType == DND_PROC_PARENT) { + dInfo("node:%s, not start in parent process", pWrapper->name); + } else if (pWrapper->procType == DND_PROC_CHILD) { + dInfo("node:%s, start in child process", pWrapper->name); + if (taosProcRun(pWrapper->procObj) != 0) { + dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); + return -1; + } + } else { + if (pWrapper->fp.startFp != NULL && (*pWrapper->fp.startFp)(pWrapper) != 0) { + dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); + return -1; + } } + return 0; } -static void dmCloseNodeImp(SMgmtWrapper *pWrapper) { - dDebug("node:%s, mgmt start to close", pWrapper->name); +void dmStopNode(SMgmtWrapper *pWrapper) { if (pWrapper->fp.stopFp != NULL) { (*pWrapper->fp.stopFp)(pWrapper); } +} + +void dmCloseNode(SMgmtWrapper *pWrapper) { + dInfo("node:%s, start to close", pWrapper->name); + if (pWrapper->procType == DND_PROC_PARENT) { + if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) { + dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId); + taosKillProc(pWrapper->procId); + dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId); + taosWaitProc(pWrapper->procId); + dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId); + } + } + + dmStopNode(pWrapper); pWrapper->required = false; taosWLockLatch(&pWrapper->latch); @@ -150,50 +188,56 @@ static void dmCloseNodeImp(SMgmtWrapper *pWrapper) { taosProcCleanup(pWrapper->procObj); pWrapper->procObj = NULL; } - dDebug("node:%s, mgmt has been closed", pWrapper->name); -} -void dmCloseNode(SMgmtWrapper *pWrapper) { - if (pWrapper->pDnode->ptype == DND_PROC_PARENT) { - if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) { - dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId); - taosKillProc(pWrapper->procId); - dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId); - taosWaitProc(pWrapper->procId); - dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId); - } - } - dmCloseNodeImp(pWrapper); + dInfo("node:%s, has been closed", pWrapper->name); } -static void dmProcessProcHandle(void *handle) { - dWarn("handle:%p, the child process dies and send an offline rsp", handle); - SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE}; - rpcSendResponse(&rpcMsg); -} +static int32_t dmOpenNodes(SDnode *pDnode) { + if (pDnode->ptype == DND_PROC_CHILD) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; + pWrapper->required = dmRequireNode(pWrapper); + if (!pWrapper->required) { + dError("dnode:%s, failed to open since not required", pWrapper->name); + } -static int32_t dmRunInSingleProcess(SDnode *pDnode) { - dInfo("dnode run in single process"); - pDnode->ptype = DND_PROC_SINGLE; + pWrapper->procType = DND_PROC_CHILD; - for (EDndNodeType n = DNODE; n < NODE_END; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - pWrapper->required = dmRequireNode(pWrapper); - if (!pWrapper->required) continue; + SMsgCb msgCb = pDnode->data.msgCb; + msgCb.pWrapper = pWrapper; + tmsgSetDefaultMsgCb(&msgCb); - if (dmOpenNodeImp(pWrapper) != 0) { - dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); + if (dmOpenNode(pWrapper) != 0) { + dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); return -1; } + } else { + for (EDndNodeType n = DNODE; n < NODE_END; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + pWrapper->required = dmRequireNode(pWrapper); + if (!pWrapper->required) continue; + + if (pDnode->ptype == DND_PROC_PARENT && n != DNODE) { + pWrapper->procType = DND_PROC_PARENT; + } else { + pWrapper->procType = DND_PROC_SINGLE; + } + + if (dmOpenNode(pWrapper) != 0) { + dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); + return -1; + } + } } dmSetStatus(pDnode, DND_STAT_RUNNING); + return 0; +} +static int32_t dmStartNodes(SDnode *pDnode) { for (EDndNodeType n = DNODE; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; - if (pWrapper->fp.startFp == NULL) continue; - if ((*pWrapper->fp.startFp)(pWrapper) != 0) { + if (dmStartNode(pWrapper) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); return -1; } @@ -201,159 +245,78 @@ static int32_t dmRunInSingleProcess(SDnode *pDnode) { dInfo("TDengine initialized successfully"); dmReportStartup(pDnode, "TDengine", "initialized successfully"); - while (1) { - if (pDnode->event == DND_EVENT_STOP) { - dInfo("dnode is about to stop"); - dmSetStatus(pDnode, DND_STAT_STOPPED); - break; - } - taosMsleep(100); - } - return 0; } -static int32_t dmRunInParentProcess(SDnode *pDnode) { - dInfo("dnode run in parent process"); - pDnode->ptype = DND_PROC_PARENT; - - SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE]; - if (dmOpenNodeImp(pDWrapper) != 0) { - dError("node:%s, failed to start since %s", pDWrapper->name, terrstr()); - return -1; - } - - for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) { +static void dmStopNodes(SDnode *pDnode) { + for (EDndNodeType n = DNODE; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - pWrapper->required = dmRequireNode(pWrapper); - if (!pWrapper->required) continue; - if (dmInitNodeProc(pWrapper) != 0) return -1; - - if (dmWriteShmFile(pWrapper) != 0) { - dError("failed to write runtime file since %s", terrstr()); - return -1; - } + dmStopNode(pWrapper); } +} - for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) { +static void dmCloseNodes(SDnode *pDnode) { + for (EDndNodeType n = DNODE; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - if (!pWrapper->required) continue; - if (dmRunNodeProc(pWrapper) != 0) return -1; + dmCloseNode(pWrapper); } +} - dmSetStatus(pDnode, DND_STAT_RUNNING); - - if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) { - dError("node:%s, failed to start since %s", pDWrapper->name, terrstr()); - return -1; - } - - dInfo("TDengine initialized successfully"); - dmReportStartup(pDnode, "TDengine", "initialized successfully"); - - while (1) { - if (pDnode->event == DND_EVENT_STOP) { - dInfo("dnode is about to stop"); - dmSetStatus(pDnode, DND_STAT_STOPPED); +static void dmProcessProcHandle(void *handle) { + dWarn("handle:%p, the child process dies and send an offline rsp", handle); + SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE}; + rpcSendResponse(&rpcMsg); +} - for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - if (!pWrapper->required) continue; - if (pDnode->ntype == NODE_END) continue; - - if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) { - dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId); - taosKillProc(pWrapper->procId); - dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId); - taosWaitProc(pWrapper->procId); - dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId); - } - } - break; - } else { - for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - if (!pWrapper->required) continue; - if (pDnode->ntype == NODE_END) continue; - - if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) { - dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); - taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle); - dmNewNodeProc(pWrapper, n); - } +static void dmWatchNodes(SDnode *pDnode) { + if (pDnode->ptype == DND_PROC_PARENT) { + for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + if (!pWrapper->required) continue; + if (pDnode->ntype == NODE_END) continue; + + if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) { + dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); + taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle); + dmNewNodeProc(pWrapper, n); } } - - taosMsleep(100); } - - return 0; } -static int32_t dmRunInChildProcess(SDnode *pDnode) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; - dInfo("%s run in child process", pWrapper->name); - pDnode->ptype = DND_PROC_CHILD; - - pWrapper->required = dmRequireNode(pWrapper); - if (!pWrapper->required) { - dError("%s does not require startup", pWrapper->name); - return -1; - } - - SMsgCb msgCb = dmGetMsgcb(pWrapper); - tmsgSetDefaultMsgCb(&msgCb); - pWrapper->procType = DND_PROC_CHILD; - - if (dmOpenNodeImp(pWrapper) != 0) { - dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); - return -1; +int32_t dmRun(SDnode *pDnode) { + if (!tsMultiProcess) { + pDnode->ptype = DND_PROC_SINGLE; + dInfo("dnode run in single process"); + } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_END) { + pDnode->ptype = DND_PROC_PARENT; + dInfo("dnode run in parent process"); + } else { + pDnode->ptype = DND_PROC_CHILD; + SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; + dInfo("%s run in child process", pWrapper->name); } - SProcCfg cfg = dmGenProcCfg(pWrapper); - cfg.isChild = true; - pWrapper->procObj = taosProcInit(&cfg); - if (pWrapper->procObj == NULL) { - dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); + if (dmOpenNodes(pDnode) != 0) { + dError("failed to open nodes since %s", terrstr()); return -1; } - if (pWrapper->fp.startFp != NULL) { - if ((*pWrapper->fp.startFp)(pWrapper) != 0) { - dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); - return -1; - } - } - - dmSetStatus(pDnode, DND_STAT_RUNNING); - - if (taosProcRun(pWrapper->procObj) != 0) { - dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); + if (dmStartNodes(pDnode) != 0) { + dError("failed to start nodes since %s", terrstr()); return -1; } - dInfo("TDengine initialized successfully"); - dmReportStartup(pDnode, "TDengine", "initialized successfully"); while (1) { - if (pDnode->event == DND_EVENT_STOP) { - dInfo("%s is about to stop", pWrapper->name); + taosMsleep(100); + if (pDnode->event & DND_EVENT_STOP) { + dInfo("dnode is about to stop"); dmSetStatus(pDnode, DND_STAT_STOPPED); - break; + dmStopNodes(pDnode); + dmCloseNodes(pDnode); + return 0; + } else { + dmWatchNodes(pDnode); } - taosMsleep(100); - } - - return 0; -} - -int32_t dmRun(SDnode *pDnode) { - if (!tsMultiProcess) { - return dmRunInSingleProcess(pDnode); - } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_END) { - return dmRunInParentProcess(pDnode); - } else { - return dmRunInChildProcess(pDnode); } - - return 0; } diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index b5c81850ef..8f9a316c72 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -152,18 +152,20 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) int32_t code = (*pWrapper->fp.dropFp)(pWrapper, pMsg); if (code != 0) { dError("node:%s, failed to drop since %s", pWrapper->name, terrstr()); - pWrapper->required = true; - pWrapper->deployed = true; } else { dDebug("node:%s, has been dropped", pWrapper->name); pWrapper->required = false; pWrapper->deployed = false; - dmCloseNode(pWrapper); + taosRemoveDir(pWrapper->path); } taosWUnLockLatch(&pWrapper->latch); dmReleaseWrapper(pWrapper); - return 0; + + if (code == 0) { + dmCloseNode(pWrapper); + } + return code; } static void dmSetMgmtMsgHandle(SMgmtWrapper *pWrapper) { diff --git a/source/dnode/mgmt/implement/src/dmObj.c b/source/dnode/mgmt/implement/src/dmObj.c index 2a9ea87530..be89814d6a 100644 --- a/source/dnode/mgmt/implement/src/dmObj.c +++ b/source/dnode/mgmt/implement/src/dmObj.c @@ -140,12 +140,6 @@ _OVER: void dmClose(SDnode *pDnode) { if (pDnode == NULL) return; - - for (EDndNodeType n = DNODE; n < NODE_END; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - dmCloseNode(pWrapper); - } - dmClearVars(pDnode); dInfo("dnode is closed, data:%p", pDnode); } diff --git a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h index 8a7442efc3..3adcc1206b 100644 --- a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h +++ b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h @@ -33,10 +33,6 @@ typedef struct SBnodeMgmt { SSingleWorker monitorWorker; } SBnodeMgmt; -// bmInt.c -int32_t bmOpen(SMgmtWrapper *pWrapper); -int32_t bmDrop(SMgmtWrapper *pWrapper); - // bmHandle.c void bmInitMsgHandle(SMgmtWrapper *pWrapper); int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c index ac96a5fd91..49bf9201e1 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c @@ -57,10 +57,15 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->data.dnodeId); return -1; - } else { - // return dmOpenNode(pWrapper); - return 0; } + + bool deployed = true; + if (dmWriteFile(pWrapper, deployed) != 0) { + dError("failed to write bnode file since %s", terrstr()); + return -1; + } + + return 0; } int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { @@ -77,10 +82,15 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop bnode since %s", terrstr()); return -1; - } else { - // dmCloseNode(pWrapper); - return bmDrop(pWrapper); } + + bool deployed = false; + if (dmWriteFile(pWrapper, deployed) != 0) { + dError("failed to write bnode file since %s", terrstr()); + return -1; + } + + return 0; } void bmInitMsgHandle(SMgmtWrapper *pWrapper) { diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c index 860c5d465c..f635df8ec7 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c @@ -24,63 +24,17 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { pOption->msgCb = msgCb; } -static int32_t bmOpenImp(SBnodeMgmt *pMgmt) { - SBnodeOpt option = {0}; - bmInitOption(pMgmt, &option); - - pMgmt->pBnode = bndOpen(pMgmt->path, &option); - if (pMgmt->pBnode == NULL) { - dError("failed to open bnode since %s", terrstr()); - return -1; - } - - if (bmStartWorker(pMgmt) != 0) { - dError("failed to start bnode worker since %s", terrstr()); - return -1; - } - - bool deployed = true; - if (dmWriteFile(pMgmt->pWrapper, deployed) != 0) { - dError("failed to write bnode file since %s", terrstr()); - return -1; - } - - return 0; -} +static void bmClose(SMgmtWrapper *pWrapper) { + SBnodeMgmt *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return; -static void bmCloseImp(SBnodeMgmt *pMgmt) { + dInfo("bnode-mgmt start to cleanup"); if (pMgmt->pBnode != NULL) { bmStopWorker(pMgmt); bndClose(pMgmt->pBnode); pMgmt->pBnode = NULL; } -} - -int32_t bmDrop(SMgmtWrapper *pWrapper) { - SBnodeMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return 0; - - dInfo("bnode-mgmt start to drop"); - bool deployed = false; - if (dmWriteFile(pWrapper, deployed) != 0) { - dError("failed to drop bnode since %s", terrstr()); - return -1; - } - bmCloseImp(pMgmt); - taosRemoveDir(pMgmt->path); - pWrapper->pMgmt = NULL; - taosMemoryFree(pMgmt); - dInfo("bnode-mgmt is dropped"); - return 0; -} - -static void bmClose(SMgmtWrapper *pWrapper) { - SBnodeMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; - - dInfo("bnode-mgmt start to cleanup"); - bmCloseImp(pMgmt); pWrapper->pMgmt = NULL; taosMemoryFree(pMgmt); dInfo("bnode-mgmt is cleaned up"); @@ -99,15 +53,22 @@ int32_t bmOpen(SMgmtWrapper *pWrapper) { pMgmt->pWrapper = pWrapper; pWrapper->pMgmt = pMgmt; - int32_t code = bmOpenImp(pMgmt); - if (code != 0) { - dError("failed to init bnode-mgmt since %s", terrstr()); + SBnodeOpt option = {0}; + bmInitOption(pMgmt, &option); + pMgmt->pBnode = bndOpen(pMgmt->path, &option); + if (pMgmt->pBnode == NULL) { + dError("failed to open bnode since %s", terrstr()); bmClose(pWrapper); - } else { - dInfo("bnode-mgmt is initialized"); + return -1; } - return code; + if (bmStartWorker(pMgmt) != 0) { + dError("failed to start bnode worker since %s", terrstr()); + bmClose(pWrapper); + return -1; + } + + return 0; } void bmSetMgmtFp(SMgmtWrapper *pWrapper) { diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 0742d2b9f9..d76c1f6679 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -41,7 +41,7 @@ typedef struct SMnodeMgmt { // mmFile.c int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); -int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed); +int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deployed); // mmInt.c int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index f3d0d666bb..9724cc04a2 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -105,9 +105,11 @@ PRASE_MNODE_OVER: return code; } -int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) { - char file[PATH_MAX]; - snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); +int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deployed) { + char file[PATH_MAX] = {0}; + char realfile[PATH_MAX] = {0}; + snprintf(file, sizeof(file), "%s%smnode.json.bak", pWrapper->path, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%smnode.json", pWrapper->path, TD_DIRSEP); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -121,19 +123,21 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) { char *content = taosMemoryCalloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", deployed); - len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); - for (int32_t i = 0; i < pMgmt->replica; ++i) { - SReplica *pReplica = &pMgmt->replicas[i]; - len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); - if (i < pMgmt->replica - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }]\n"); + if (pReq != NULL) { + len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); + for (int32_t i = 0; i < pReq->replica; ++i) { + SReplica *pReplica = &pReq->replicas[i]; + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); + if (i < pReq->replica - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }],\n"); + } } } + len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed); len += snprintf(content + len, maxLen - len, "}\n"); taosWriteFile(pFile, content, len); @@ -141,9 +145,6 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) { taosCloseFile(&pFile); taosMemoryFree(content); - char realfile[PATH_MAX]; - snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); - if (taosRenameFile(file, realfile) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to rename %s since %s", file, terrstr()); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 5e277b6ed2..c6149a9a7e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -60,9 +60,15 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create mnode since %s", terrstr()); return -1; - } else { - return mmOpenFromMsg(pWrapper, &createReq); } + + bool deployed = true; + if (mmWriteFile(pWrapper, &createReq, deployed) != 0) { + dError("failed to write mnode file since %s", terrstr()); + return -1; + } + + return 0; } int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { @@ -79,10 +85,15 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop mnode since %s", terrstr()); return -1; - } else { - // dmCloseNode(pWrapper); - return mmDrop(pWrapper); } + + bool deployed = false; + if (mmWriteFile(pWrapper, NULL, deployed) != 0) { + dError("failed to write mnode file since %s", terrstr()); + return -1; + } + + return 0; } int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index ce0459686c..49d5d79491 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -134,12 +134,6 @@ static int32_t mmOpenImp(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq) { return -1; } - bool deployed = true; - if (mmWriteFile(pMgmt, deployed) != 0) { - dError("failed to write mnode file since %s", terrstr()); - return -1; - } - return 0; } @@ -164,11 +158,11 @@ int32_t mmDrop(SMgmtWrapper *pWrapper) { if (pMgmt == NULL) return 0; dInfo("mnode-mgmt start to drop"); - bool deployed = false; - if (mmWriteFile(pMgmt, deployed) != 0) { - dError("failed to drop mnode since %s", terrstr()); - return -1; - } + // bool deployed = false; + // if (mmWriteFile(pMgmt, deployed) != 0) { + // dError("failed to drop mnode since %s", terrstr()); + // return -1; + // } mmCloseImp(pMgmt); taosRemoveDir(pMgmt->path); @@ -229,7 +223,9 @@ static int32_t mmStart(SMgmtWrapper *pWrapper) { static void mmStop(SMgmtWrapper *pWrapper) { dDebug("mnode-mgmt start to stop"); SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mndStop(pMgmt->pMnode); + if (pMgmt != NULL) { + mndStop(pMgmt->pMnode); + } } void mmSetMgmtFp(SMgmtWrapper *pWrapper) { diff --git a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h index c0ed6599bd..d52fbff683 100644 --- a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h +++ b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h @@ -34,10 +34,6 @@ typedef struct SQnodeMgmt { SSingleWorker monitorWorker; } SQnodeMgmt; -// qmInt.c -int32_t qmOpen(SMgmtWrapper *pWrapper); -int32_t qmDrop(SMgmtWrapper *pWrapper); - // qmHandle.c void qmInitMsgHandle(SMgmtWrapper *pWrapper); int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 209c0b3f6c..d8a54cfbd5 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -57,10 +57,15 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create qnode since %s", terrstr()); return -1; - } else { - // return dmOpenNode(pWrapper); - return 0; } + + bool deployed = true; + if (dmWriteFile(pWrapper, deployed) != 0) { + dError("failed to write qnode file since %s", terrstr()); + return -1; + } + + return 0; } int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { @@ -77,10 +82,15 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop qnode since %s", terrstr()); return -1; - } else { - // dmCloseNode(pWrapper); - return qmDrop(pWrapper); } + + bool deployed = false; + if (dmWriteFile(pWrapper, deployed) != 0) { + dError("failed to write qnode file since %s", terrstr()); + return -1; + } + + return 0; } void qmInitMsgHandle(SMgmtWrapper *pWrapper) { diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c index c73d337136..d8d02b2619 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c @@ -27,69 +27,23 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { pOption->msgCb = msgCb; } -static int32_t qmOpenImp(SQnodeMgmt *pMgmt) { - SQnodeOpt option = {0}; - qmInitOption(pMgmt, &option); - - pMgmt->pQnode = qndOpen(&option); - if (pMgmt->pQnode == NULL) { - dError("failed to open qnode since %s", terrstr()); - return -1; - } - - if (qmStartWorker(pMgmt) != 0) { - dError("failed to start qnode worker since %s", terrstr()); - return -1; - } - - bool deployed = true; - if (dmWriteFile(pMgmt->pWrapper, deployed) != 0) { - dError("failed to write qnode file since %s", terrstr()); - return -1; - } - - return 0; -} +static void qmClose(SMgmtWrapper *pWrapper) { + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return; -static void qmCloseImp(SQnodeMgmt *pMgmt) { + dInfo("qnode-mgmt start to cleanup"); if (pMgmt->pQnode != NULL) { qmStopWorker(pMgmt); qndClose(pMgmt->pQnode); pMgmt->pQnode = NULL; } -} - -int32_t qmDrop(SMgmtWrapper *pWrapper) { - SQnodeMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return 0; - - dInfo("qnode-mgmt start to drop"); - bool deployed = false; - if (dmWriteFile(pWrapper, deployed) != 0) { - dError("failed to drop qnode since %s", terrstr()); - return -1; - } - qmCloseImp(pMgmt); - taosRemoveDir(pMgmt->path); - pWrapper->pMgmt = NULL; - taosMemoryFree(pMgmt); - dInfo("qnode-mgmt is dropped"); - return 0; -} - -static void qmClose(SMgmtWrapper *pWrapper) { - SQnodeMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; - - dInfo("qnode-mgmt start to cleanup"); - qmCloseImp(pMgmt); pWrapper->pMgmt = NULL; taosMemoryFree(pMgmt); dInfo("qnode-mgmt is cleaned up"); } -int32_t qmOpen(SMgmtWrapper *pWrapper) { +static int32_t qmOpen(SMgmtWrapper *pWrapper) { dInfo("qnode-mgmt start to init"); SQnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SQnodeMgmt)); if (pMgmt == NULL) { @@ -102,15 +56,23 @@ int32_t qmOpen(SMgmtWrapper *pWrapper) { pMgmt->pWrapper = pWrapper; pWrapper->pMgmt = pMgmt; - int32_t code = qmOpenImp(pMgmt); - if (code != 0) { - dError("failed to init qnode-mgmt since %s", terrstr()); + SQnodeOpt option = {0}; + qmInitOption(pMgmt, &option); + pMgmt->pQnode = qndOpen(&option); + if (pMgmt->pQnode == NULL) { + dError("failed to open qnode since %s", terrstr()); qmClose(pWrapper); - } else { - dInfo("qnode-mgmt is initialized"); + return -1; } - return code; + if (qmStartWorker(pMgmt) != 0) { + dError("failed to start qnode worker since %s", terrstr()); + qmClose(pWrapper); + return -1; + } + + dInfo("qnode-mgmt is initialized"); + return 0; } void qmSetMgmtFp(SMgmtWrapper *pWrapper) { diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index 5045f21272..9eb48af733 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -36,10 +36,6 @@ typedef struct SSnodeMgmt { SSingleWorker monitorWorker; } SSnodeMgmt; -// smInt.c -int32_t smOpen(SMgmtWrapper *pWrapper); -int32_t smDrop(SMgmtWrapper *pWrapper); - // smHandle.c void smInitMsgHandle(SMgmtWrapper *pWrapper); int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 4c0e0aef8d..defc5ab136 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -57,10 +57,15 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create snode since %s", terrstr()); return -1; - } else { - // return dmOpenNode(pWrapper); - return 0; } + + bool deployed = true; + if (dmWriteFile(pWrapper, deployed) != 0) { + dError("failed to write snode file since %s", terrstr()); + return -1; + } + + return 0; } int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { @@ -77,10 +82,15 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop snode since %s", terrstr()); return -1; - } else { - return smDrop(pWrapper); - // return dmCloseNode(pWrapper); } + + bool deployed = false; + if (dmWriteFile(pWrapper, deployed) != 0) { + dError("failed to write snode file since %s", terrstr()); + return -1; + } + + return 0; } void smInitMsgHandle(SMgmtWrapper *pWrapper) { diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 3b17d775f7..8adf643a2b 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -24,63 +24,17 @@ static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { pOption->msgCb = msgCb; } -static int32_t smOpenImp(SSnodeMgmt *pMgmt) { - SSnodeOpt option = {0}; - smInitOption(pMgmt, &option); - - pMgmt->pSnode = sndOpen(pMgmt->path, &option); - if (pMgmt->pSnode == NULL) { - dError("failed to open snode since %s", terrstr()); - return -1; - } - - if (smStartWorker(pMgmt) != 0) { - dError("failed to start snode worker since %s", terrstr()); - return -1; - } - - bool deployed = true; - if (dmWriteFile(pMgmt->pWrapper, deployed) != 0) { - dError("failed to write snode file since %s", terrstr()); - return -1; - } - - return 0; -} +static void smClose(SMgmtWrapper *pWrapper) { + SSnodeMgmt *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return; -static void smCloseImp(SSnodeMgmt *pMgmt) { + dInfo("snode-mgmt start to cleanup"); if (pMgmt->pSnode != NULL) { smStopWorker(pMgmt); sndClose(pMgmt->pSnode); pMgmt->pSnode = NULL; } -} - -int32_t smDrop(SMgmtWrapper *pWrapper) { - SSnodeMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return 0; - dInfo("snode-mgmt start to drop"); - bool deployed = false; - if (dmWriteFile(pWrapper, deployed) != 0) { - dError("failed to drop snode since %s", terrstr()); - return -1; - } - - smCloseImp(pMgmt); - taosRemoveDir(pMgmt->path); - pWrapper->pMgmt = NULL; - taosMemoryFree(pMgmt); - dInfo("snode-mgmt is dropped"); - return 0; -} - -static void smClose(SMgmtWrapper *pWrapper) { - SSnodeMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; - - dInfo("snode-mgmt start to cleanup"); - smCloseImp(pMgmt); pWrapper->pMgmt = NULL; taosMemoryFree(pMgmt); dInfo("snode-mgmt is cleaned up"); @@ -99,15 +53,20 @@ int32_t smOpen(SMgmtWrapper *pWrapper) { pMgmt->pWrapper = pWrapper; pWrapper->pMgmt = pMgmt; - int32_t code = smOpenImp(pMgmt); - if (code != 0) { - dError("failed to init snode-mgmt since %s", terrstr()); - smClose(pWrapper); - } else { - dInfo("snode-mgmt is initialized"); + SSnodeOpt option = {0}; + smInitOption(pMgmt, &option); + pMgmt->pSnode = sndOpen(pMgmt->path, &option); + if (pMgmt->pSnode == NULL) { + dError("failed to open snode since %s", terrstr()); + return -1; } - return code; + if (smStartWorker(pMgmt) != 0) { + dError("failed to start snode worker since %s", terrstr()); + return -1; + } + + return 0; } void smSetMgmtFp(SMgmtWrapper *pWrapper) { -- GitLab