提交 3b54d903 编写于 作者: S Shengliang Guan

refact(cluster): node mgmt

上级 1dd4984e
......@@ -51,7 +51,7 @@ typedef struct {
int8_t ntype;
} SDnodeOpt;
typedef enum { DND_EVENT_START, DND_EVENT_STOP = 1, DND_EVENT_CHILD } EDndEvent;
typedef enum { DND_EVENT_START = 0, DND_EVENT_STOP = 1, DND_EVENT_CHILD = 2 } EDndEvent;
/**
* @brief Initialize and start the dnode.
......
......@@ -99,40 +99,78 @@ static int32_t dmRunNodeProc(SMgmtWrapper *pWrapper) {
return 0;
}
static int32_t dmOpenNodeImp(SMgmtWrapper *pWrapper) {
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
return -1;
}
if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
if (pWrapper->procType == DND_PROC_SINGLE || pWrapper->procType == DND_PROC_CHILD) {
if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
}
dDebug("node:%s, has been opened", pWrapper->name);
pWrapper->deployed = true;
} else {
if (dmInitNodeProc(pWrapper) != 0) return -1;
if (dmWriteShmFile(pWrapper) != 0) return -1;
if (dmRunNodeProc(pWrapper) != 0) return -1;
}
if (pWrapper->procType == DND_PROC_CHILD) {
SProcCfg cfg = dmGenProcCfg(pWrapper);
cfg.isChild = true;
pWrapper->procObj = taosProcInit(&cfg);
if (pWrapper->procObj == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
return -1;
}
}
dDebug("node:%s, has been opened", pWrapper->name);
pWrapper->deployed = true;
return 0;
}
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
if (pDnode->ptype == DND_PROC_SINGLE) {
return dmOpenNodeImp(pWrapper);
} else if (pDnode->ptype == DND_PROC_PARENT) {
if (dmInitNodeProc(pWrapper) != 0) return -1;
if (dmWriteShmFile(pWrapper) != 0) return -1;
if (dmRunNodeProc(pWrapper) != 0) return -1;
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
if (pWrapper->procType == DND_PROC_PARENT) {
dInfo("node:%s, not start in parent process", pWrapper->name);
} else if (pWrapper->procType == DND_PROC_CHILD) {
dInfo("node:%s, start in child process", pWrapper->name);
if (taosProcRun(pWrapper->procObj) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
} else {
if (pWrapper->fp.startFp != NULL && (*pWrapper->fp.startFp)(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
}
return 0;
}
static void dmCloseNodeImp(SMgmtWrapper *pWrapper) {
dDebug("node:%s, mgmt start to close", pWrapper->name);
void dmStopNode(SMgmtWrapper *pWrapper) {
if (pWrapper->fp.stopFp != NULL) {
(*pWrapper->fp.stopFp)(pWrapper);
}
}
void dmCloseNode(SMgmtWrapper *pWrapper) {
dInfo("node:%s, start to close", pWrapper->name);
if (pWrapper->procType == DND_PROC_PARENT) {
if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
taosKillProc(pWrapper->procId);
dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
taosWaitProc(pWrapper->procId);
dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
}
}
dmStopNode(pWrapper);
pWrapper->required = false;
taosWLockLatch(&pWrapper->latch);
......@@ -150,50 +188,56 @@ static void dmCloseNodeImp(SMgmtWrapper *pWrapper) {
taosProcCleanup(pWrapper->procObj);
pWrapper->procObj = NULL;
}
dDebug("node:%s, mgmt has been closed", pWrapper->name);
}
void dmCloseNode(SMgmtWrapper *pWrapper) {
if (pWrapper->pDnode->ptype == DND_PROC_PARENT) {
if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
taosKillProc(pWrapper->procId);
dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
taosWaitProc(pWrapper->procId);
dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
}
}
dmCloseNodeImp(pWrapper);
dInfo("node:%s, has been closed", pWrapper->name);
}
static void dmProcessProcHandle(void *handle) {
dWarn("handle:%p, the child process dies and send an offline rsp", handle);
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
rpcSendResponse(&rpcMsg);
}
static int32_t dmOpenNodes(SDnode *pDnode) {
if (pDnode->ptype == DND_PROC_CHILD) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
pWrapper->required = dmRequireNode(pWrapper);
if (!pWrapper->required) {
dError("dnode:%s, failed to open since not required", pWrapper->name);
}
static int32_t dmRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode run in single process");
pDnode->ptype = DND_PROC_SINGLE;
pWrapper->procType = DND_PROC_CHILD;
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dmRequireNode(pWrapper);
if (!pWrapper->required) continue;
SMsgCb msgCb = pDnode->data.msgCb;
msgCb.pWrapper = pWrapper;
tmsgSetDefaultMsgCb(&msgCb);
if (dmOpenNodeImp(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
if (dmOpenNode(pWrapper) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
}
} else {
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dmRequireNode(pWrapper);
if (!pWrapper->required) continue;
if (pDnode->ptype == DND_PROC_PARENT && n != DNODE) {
pWrapper->procType = DND_PROC_PARENT;
} else {
pWrapper->procType = DND_PROC_SINGLE;
}
if (dmOpenNode(pWrapper) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
}
}
}
dmSetStatus(pDnode, DND_STAT_RUNNING);
return 0;
}
static int32_t dmStartNodes(SDnode *pDnode) {
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pWrapper->fp.startFp == NULL) continue;
if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
if (dmStartNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
......@@ -201,159 +245,78 @@ static int32_t dmRunInSingleProcess(SDnode *pDnode) {
dInfo("TDengine initialized successfully");
dmReportStartup(pDnode, "TDengine", "initialized successfully");
while (1) {
if (pDnode->event == DND_EVENT_STOP) {
dInfo("dnode is about to stop");
dmSetStatus(pDnode, DND_STAT_STOPPED);
break;
}
taosMsleep(100);
}
return 0;
}
static int32_t dmRunInParentProcess(SDnode *pDnode) {
dInfo("dnode run in parent process");
pDnode->ptype = DND_PROC_PARENT;
SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
if (dmOpenNodeImp(pDWrapper) != 0) {
dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
return -1;
}
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
static void dmStopNodes(SDnode *pDnode) {
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dmRequireNode(pWrapper);
if (!pWrapper->required) continue;
if (dmInitNodeProc(pWrapper) != 0) return -1;
if (dmWriteShmFile(pWrapper) != 0) {
dError("failed to write runtime file since %s", terrstr());
return -1;
}
dmStopNode(pWrapper);
}
}
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
static void dmCloseNodes(SDnode *pDnode) {
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (dmRunNodeProc(pWrapper) != 0) return -1;
dmCloseNode(pWrapper);
}
}
dmSetStatus(pDnode, DND_STAT_RUNNING);
if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) {
dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
return -1;
}
dInfo("TDengine initialized successfully");
dmReportStartup(pDnode, "TDengine", "initialized successfully");
while (1) {
if (pDnode->event == DND_EVENT_STOP) {
dInfo("dnode is about to stop");
dmSetStatus(pDnode, DND_STAT_STOPPED);
static void dmProcessProcHandle(void *handle) {
dWarn("handle:%p, the child process dies and send an offline rsp", handle);
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
rpcSendResponse(&rpcMsg);
}
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_END) continue;
if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
taosKillProc(pWrapper->procId);
dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
taosWaitProc(pWrapper->procId);
dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
}
}
break;
} else {
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_END) continue;
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle);
dmNewNodeProc(pWrapper, n);
}
static void dmWatchNodes(SDnode *pDnode) {
if (pDnode->ptype == DND_PROC_PARENT) {
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_END) continue;
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle);
dmNewNodeProc(pWrapper, n);
}
}
taosMsleep(100);
}
return 0;
}
static int32_t dmRunInChildProcess(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
dInfo("%s run in child process", pWrapper->name);
pDnode->ptype = DND_PROC_CHILD;
pWrapper->required = dmRequireNode(pWrapper);
if (!pWrapper->required) {
dError("%s does not require startup", pWrapper->name);
return -1;
}
SMsgCb msgCb = dmGetMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&msgCb);
pWrapper->procType = DND_PROC_CHILD;
if (dmOpenNodeImp(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
int32_t dmRun(SDnode *pDnode) {
if (!tsMultiProcess) {
pDnode->ptype = DND_PROC_SINGLE;
dInfo("dnode run in single process");
} else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
pDnode->ptype = DND_PROC_PARENT;
dInfo("dnode run in parent process");
} else {
pDnode->ptype = DND_PROC_CHILD;
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
dInfo("%s run in child process", pWrapper->name);
}
SProcCfg cfg = dmGenProcCfg(pWrapper);
cfg.isChild = true;
pWrapper->procObj = taosProcInit(&cfg);
if (pWrapper->procObj == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
if (dmOpenNodes(pDnode) != 0) {
dError("failed to open nodes since %s", terrstr());
return -1;
}
if (pWrapper->fp.startFp != NULL) {
if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
}
dmSetStatus(pDnode, DND_STAT_RUNNING);
if (taosProcRun(pWrapper->procObj) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
if (dmStartNodes(pDnode) != 0) {
dError("failed to start nodes since %s", terrstr());
return -1;
}
dInfo("TDengine initialized successfully");
dmReportStartup(pDnode, "TDengine", "initialized successfully");
while (1) {
if (pDnode->event == DND_EVENT_STOP) {
dInfo("%s is about to stop", pWrapper->name);
taosMsleep(100);
if (pDnode->event & DND_EVENT_STOP) {
dInfo("dnode is about to stop");
dmSetStatus(pDnode, DND_STAT_STOPPED);
break;
dmStopNodes(pDnode);
dmCloseNodes(pDnode);
return 0;
} else {
dmWatchNodes(pDnode);
}
taosMsleep(100);
}
return 0;
}
int32_t dmRun(SDnode *pDnode) {
if (!tsMultiProcess) {
return dmRunInSingleProcess(pDnode);
} else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
return dmRunInParentProcess(pDnode);
} else {
return dmRunInChildProcess(pDnode);
}
return 0;
}
......@@ -152,18 +152,20 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg)
int32_t code = (*pWrapper->fp.dropFp)(pWrapper, pMsg);
if (code != 0) {
dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
pWrapper->required = true;
pWrapper->deployed = true;
} else {
dDebug("node:%s, has been dropped", pWrapper->name);
pWrapper->required = false;
pWrapper->deployed = false;
dmCloseNode(pWrapper);
taosRemoveDir(pWrapper->path);
}
taosWUnLockLatch(&pWrapper->latch);
dmReleaseWrapper(pWrapper);
return 0;
if (code == 0) {
dmCloseNode(pWrapper);
}
return code;
}
static void dmSetMgmtMsgHandle(SMgmtWrapper *pWrapper) {
......
......@@ -140,12 +140,6 @@ _OVER:
void dmClose(SDnode *pDnode) {
if (pDnode == NULL) return;
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dmCloseNode(pWrapper);
}
dmClearVars(pDnode);
dInfo("dnode is closed, data:%p", pDnode);
}
......@@ -33,10 +33,6 @@ typedef struct SBnodeMgmt {
SSingleWorker monitorWorker;
} SBnodeMgmt;
// bmInt.c
int32_t bmOpen(SMgmtWrapper *pWrapper);
int32_t bmDrop(SMgmtWrapper *pWrapper);
// bmHandle.c
void bmInitMsgHandle(SMgmtWrapper *pWrapper);
int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
......
......@@ -57,10 +57,15 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->data.dnodeId);
return -1;
} else {
// return dmOpenNode(pWrapper);
return 0;
}
bool deployed = true;
if (dmWriteFile(pWrapper, deployed) != 0) {
dError("failed to write bnode file since %s", terrstr());
return -1;
}
return 0;
}
int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
......@@ -77,10 +82,15 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr());
return -1;
} else {
// dmCloseNode(pWrapper);
return bmDrop(pWrapper);
}
bool deployed = false;
if (dmWriteFile(pWrapper, deployed) != 0) {
dError("failed to write bnode file since %s", terrstr());
return -1;
}
return 0;
}
void bmInitMsgHandle(SMgmtWrapper *pWrapper) {
......
......@@ -24,63 +24,17 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
pOption->msgCb = msgCb;
}
static int32_t bmOpenImp(SBnodeMgmt *pMgmt) {
SBnodeOpt option = {0};
bmInitOption(pMgmt, &option);
pMgmt->pBnode = bndOpen(pMgmt->path, &option);
if (pMgmt->pBnode == NULL) {
dError("failed to open bnode since %s", terrstr());
return -1;
}
if (bmStartWorker(pMgmt) != 0) {
dError("failed to start bnode worker since %s", terrstr());
return -1;
}
bool deployed = true;
if (dmWriteFile(pMgmt->pWrapper, deployed) != 0) {
dError("failed to write bnode file since %s", terrstr());
return -1;
}
return 0;
}
static void bmClose(SMgmtWrapper *pWrapper) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
static void bmCloseImp(SBnodeMgmt *pMgmt) {
dInfo("bnode-mgmt start to cleanup");
if (pMgmt->pBnode != NULL) {
bmStopWorker(pMgmt);
bndClose(pMgmt->pBnode);
pMgmt->pBnode = NULL;
}
}
int32_t bmDrop(SMgmtWrapper *pWrapper) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return 0;
dInfo("bnode-mgmt start to drop");
bool deployed = false;
if (dmWriteFile(pWrapper, deployed) != 0) {
dError("failed to drop bnode since %s", terrstr());
return -1;
}
bmCloseImp(pMgmt);
taosRemoveDir(pMgmt->path);
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt);
dInfo("bnode-mgmt is dropped");
return 0;
}
static void bmClose(SMgmtWrapper *pWrapper) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("bnode-mgmt start to cleanup");
bmCloseImp(pMgmt);
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt);
dInfo("bnode-mgmt is cleaned up");
......@@ -99,15 +53,22 @@ int32_t bmOpen(SMgmtWrapper *pWrapper) {
pMgmt->pWrapper = pWrapper;
pWrapper->pMgmt = pMgmt;
int32_t code = bmOpenImp(pMgmt);
if (code != 0) {
dError("failed to init bnode-mgmt since %s", terrstr());
SBnodeOpt option = {0};
bmInitOption(pMgmt, &option);
pMgmt->pBnode = bndOpen(pMgmt->path, &option);
if (pMgmt->pBnode == NULL) {
dError("failed to open bnode since %s", terrstr());
bmClose(pWrapper);
} else {
dInfo("bnode-mgmt is initialized");
return -1;
}
return code;
if (bmStartWorker(pMgmt) != 0) {
dError("failed to start bnode worker since %s", terrstr());
bmClose(pWrapper);
return -1;
}
return 0;
}
void bmSetMgmtFp(SMgmtWrapper *pWrapper) {
......
......@@ -41,7 +41,7 @@ typedef struct SMnodeMgmt {
// mmFile.c
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed);
int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deployed);
// mmInt.c
int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq);
......
......@@ -105,9 +105,11 @@ PRASE_MNODE_OVER:
return code;
}
int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) {
char file[PATH_MAX];
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deployed) {
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", pWrapper->path, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%smnode.json", pWrapper->path, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
......@@ -121,19 +123,21 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) {
char *content = taosMemoryCalloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", deployed);
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
for (int32_t i = 0; i < pMgmt->replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
if (i < pMgmt->replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
if (pReq != NULL) {
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
for (int32_t i = 0; i < pReq->replica; ++i) {
SReplica *pReplica = &pReq->replicas[i];
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
if (i < pReq->replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }],\n");
}
}
}
len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed);
len += snprintf(content + len, maxLen - len, "}\n");
taosWriteFile(pFile, content, len);
......@@ -141,9 +145,6 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) {
taosCloseFile(&pFile);
taosMemoryFree(content);
char realfile[PATH_MAX];
snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to rename %s since %s", file, terrstr());
......
......@@ -60,9 +60,15 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr());
return -1;
} else {
return mmOpenFromMsg(pWrapper, &createReq);
}
bool deployed = true;
if (mmWriteFile(pWrapper, &createReq, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
return -1;
}
return 0;
}
int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
......@@ -79,10 +85,15 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr());
return -1;
} else {
// dmCloseNode(pWrapper);
return mmDrop(pWrapper);
}
bool deployed = false;
if (mmWriteFile(pWrapper, NULL, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
return -1;
}
return 0;
}
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
......
......@@ -134,12 +134,6 @@ static int32_t mmOpenImp(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq) {
return -1;
}
bool deployed = true;
if (mmWriteFile(pMgmt, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
return -1;
}
return 0;
}
......@@ -164,11 +158,11 @@ int32_t mmDrop(SMgmtWrapper *pWrapper) {
if (pMgmt == NULL) return 0;
dInfo("mnode-mgmt start to drop");
bool deployed = false;
if (mmWriteFile(pMgmt, deployed) != 0) {
dError("failed to drop mnode since %s", terrstr());
return -1;
}
// bool deployed = false;
// if (mmWriteFile(pMgmt, deployed) != 0) {
// dError("failed to drop mnode since %s", terrstr());
// return -1;
// }
mmCloseImp(pMgmt);
taosRemoveDir(pMgmt->path);
......@@ -229,7 +223,9 @@ static int32_t mmStart(SMgmtWrapper *pWrapper) {
static void mmStop(SMgmtWrapper *pWrapper) {
dDebug("mnode-mgmt start to stop");
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mndStop(pMgmt->pMnode);
if (pMgmt != NULL) {
mndStop(pMgmt->pMnode);
}
}
void mmSetMgmtFp(SMgmtWrapper *pWrapper) {
......
......@@ -34,10 +34,6 @@ typedef struct SQnodeMgmt {
SSingleWorker monitorWorker;
} SQnodeMgmt;
// qmInt.c
int32_t qmOpen(SMgmtWrapper *pWrapper);
int32_t qmDrop(SMgmtWrapper *pWrapper);
// qmHandle.c
void qmInitMsgHandle(SMgmtWrapper *pWrapper);
int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
......
......@@ -57,10 +57,15 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create qnode since %s", terrstr());
return -1;
} else {
// return dmOpenNode(pWrapper);
return 0;
}
bool deployed = true;
if (dmWriteFile(pWrapper, deployed) != 0) {
dError("failed to write qnode file since %s", terrstr());
return -1;
}
return 0;
}
int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
......@@ -77,10 +82,15 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr());
return -1;
} else {
// dmCloseNode(pWrapper);
return qmDrop(pWrapper);
}
bool deployed = false;
if (dmWriteFile(pWrapper, deployed) != 0) {
dError("failed to write qnode file since %s", terrstr());
return -1;
}
return 0;
}
void qmInitMsgHandle(SMgmtWrapper *pWrapper) {
......
......@@ -27,69 +27,23 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
pOption->msgCb = msgCb;
}
static int32_t qmOpenImp(SQnodeMgmt *pMgmt) {
SQnodeOpt option = {0};
qmInitOption(pMgmt, &option);
pMgmt->pQnode = qndOpen(&option);
if (pMgmt->pQnode == NULL) {
dError("failed to open qnode since %s", terrstr());
return -1;
}
if (qmStartWorker(pMgmt) != 0) {
dError("failed to start qnode worker since %s", terrstr());
return -1;
}
bool deployed = true;
if (dmWriteFile(pMgmt->pWrapper, deployed) != 0) {
dError("failed to write qnode file since %s", terrstr());
return -1;
}
return 0;
}
static void qmClose(SMgmtWrapper *pWrapper) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
static void qmCloseImp(SQnodeMgmt *pMgmt) {
dInfo("qnode-mgmt start to cleanup");
if (pMgmt->pQnode != NULL) {
qmStopWorker(pMgmt);
qndClose(pMgmt->pQnode);
pMgmt->pQnode = NULL;
}
}
int32_t qmDrop(SMgmtWrapper *pWrapper) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return 0;
dInfo("qnode-mgmt start to drop");
bool deployed = false;
if (dmWriteFile(pWrapper, deployed) != 0) {
dError("failed to drop qnode since %s", terrstr());
return -1;
}
qmCloseImp(pMgmt);
taosRemoveDir(pMgmt->path);
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt);
dInfo("qnode-mgmt is dropped");
return 0;
}
static void qmClose(SMgmtWrapper *pWrapper) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("qnode-mgmt start to cleanup");
qmCloseImp(pMgmt);
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt);
dInfo("qnode-mgmt is cleaned up");
}
int32_t qmOpen(SMgmtWrapper *pWrapper) {
static int32_t qmOpen(SMgmtWrapper *pWrapper) {
dInfo("qnode-mgmt start to init");
SQnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SQnodeMgmt));
if (pMgmt == NULL) {
......@@ -102,15 +56,23 @@ int32_t qmOpen(SMgmtWrapper *pWrapper) {
pMgmt->pWrapper = pWrapper;
pWrapper->pMgmt = pMgmt;
int32_t code = qmOpenImp(pMgmt);
if (code != 0) {
dError("failed to init qnode-mgmt since %s", terrstr());
SQnodeOpt option = {0};
qmInitOption(pMgmt, &option);
pMgmt->pQnode = qndOpen(&option);
if (pMgmt->pQnode == NULL) {
dError("failed to open qnode since %s", terrstr());
qmClose(pWrapper);
} else {
dInfo("qnode-mgmt is initialized");
return -1;
}
return code;
if (qmStartWorker(pMgmt) != 0) {
dError("failed to start qnode worker since %s", terrstr());
qmClose(pWrapper);
return -1;
}
dInfo("qnode-mgmt is initialized");
return 0;
}
void qmSetMgmtFp(SMgmtWrapper *pWrapper) {
......
......@@ -36,10 +36,6 @@ typedef struct SSnodeMgmt {
SSingleWorker monitorWorker;
} SSnodeMgmt;
// smInt.c
int32_t smOpen(SMgmtWrapper *pWrapper);
int32_t smDrop(SMgmtWrapper *pWrapper);
// smHandle.c
void smInitMsgHandle(SMgmtWrapper *pWrapper);
int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
......
......@@ -57,10 +57,15 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create snode since %s", terrstr());
return -1;
} else {
// return dmOpenNode(pWrapper);
return 0;
}
bool deployed = true;
if (dmWriteFile(pWrapper, deployed) != 0) {
dError("failed to write snode file since %s", terrstr());
return -1;
}
return 0;
}
int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
......@@ -77,10 +82,15 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr());
return -1;
} else {
return smDrop(pWrapper);
// return dmCloseNode(pWrapper);
}
bool deployed = false;
if (dmWriteFile(pWrapper, deployed) != 0) {
dError("failed to write snode file since %s", terrstr());
return -1;
}
return 0;
}
void smInitMsgHandle(SMgmtWrapper *pWrapper) {
......
......@@ -24,63 +24,17 @@ static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
pOption->msgCb = msgCb;
}
static int32_t smOpenImp(SSnodeMgmt *pMgmt) {
SSnodeOpt option = {0};
smInitOption(pMgmt, &option);
pMgmt->pSnode = sndOpen(pMgmt->path, &option);
if (pMgmt->pSnode == NULL) {
dError("failed to open snode since %s", terrstr());
return -1;
}
if (smStartWorker(pMgmt) != 0) {
dError("failed to start snode worker since %s", terrstr());
return -1;
}
bool deployed = true;
if (dmWriteFile(pMgmt->pWrapper, deployed) != 0) {
dError("failed to write snode file since %s", terrstr());
return -1;
}
return 0;
}
static void smClose(SMgmtWrapper *pWrapper) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
static void smCloseImp(SSnodeMgmt *pMgmt) {
dInfo("snode-mgmt start to cleanup");
if (pMgmt->pSnode != NULL) {
smStopWorker(pMgmt);
sndClose(pMgmt->pSnode);
pMgmt->pSnode = NULL;
}
}
int32_t smDrop(SMgmtWrapper *pWrapper) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return 0;
dInfo("snode-mgmt start to drop");
bool deployed = false;
if (dmWriteFile(pWrapper, deployed) != 0) {
dError("failed to drop snode since %s", terrstr());
return -1;
}
smCloseImp(pMgmt);
taosRemoveDir(pMgmt->path);
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt);
dInfo("snode-mgmt is dropped");
return 0;
}
static void smClose(SMgmtWrapper *pWrapper) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("snode-mgmt start to cleanup");
smCloseImp(pMgmt);
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt);
dInfo("snode-mgmt is cleaned up");
......@@ -99,15 +53,20 @@ int32_t smOpen(SMgmtWrapper *pWrapper) {
pMgmt->pWrapper = pWrapper;
pWrapper->pMgmt = pMgmt;
int32_t code = smOpenImp(pMgmt);
if (code != 0) {
dError("failed to init snode-mgmt since %s", terrstr());
smClose(pWrapper);
} else {
dInfo("snode-mgmt is initialized");
SSnodeOpt option = {0};
smInitOption(pMgmt, &option);
pMgmt->pSnode = sndOpen(pMgmt->path, &option);
if (pMgmt->pSnode == NULL) {
dError("failed to open snode since %s", terrstr());
return -1;
}
return code;
if (smStartWorker(pMgmt) != 0) {
dError("failed to start snode worker since %s", terrstr());
return -1;
}
return 0;
}
void smSetMgmtFp(SMgmtWrapper *pWrapper) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册