提交 25e5f026 编写于 作者: S Shengliang Guan

refact(cluster): node mgmt

......@@ -47,7 +47,7 @@ static void dmSetSignalHandle() {
taosSetSignal(SIGQUIT, dmStopDnode);
if (!tsMultiProcess) {
} else if (global.ntype == NODE_BEGIN || global.ntype == NODE_END) {
} else if (global.ntype == DNODE || global.ntype == NODE_END) {
taosIgnSignal(SIGCHLD);
} else {
taosKillChildOnParentStopped();
......@@ -73,7 +73,7 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
tstrncpy(global.envFile, argv[++i], PATH_MAX);
} else if (strcmp(argv[i], "-n") == 0) {
global.ntype = atoi(argv[++i]);
if (global.ntype <= NODE_BEGIN || global.ntype > NODE_END) {
if (global.ntype <= DNODE || global.ntype > NODE_END) {
printf("'-n' range is [1 - %d], default is 0\n", NODE_END - 1);
return -1;
}
......@@ -133,7 +133,7 @@ static int32_t dmInitLog() {
static void dmSetProcInfo(int32_t argc, char **argv) {
taosSetProcPath(argc, argv);
if (global.ntype != NODE_BEGIN && global.ntype != NODE_END) {
if (global.ntype != DNODE && global.ntype != NODE_END) {
const char *name = dmProcName(global.ntype);
taosSetProcName(argc, argv, name);
}
......
......@@ -174,7 +174,7 @@ static int32_t dmRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode run in single process");
pDnode->ptype = DND_PROC_SINGLE;
for (EDndNodeType n = NODE_BEGIN; n < NODE_END; ++n) {
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dmRequireNode(pWrapper);
if (!pWrapper->required) continue;
......@@ -215,13 +215,13 @@ static int32_t dmRunInParentProcess(SDnode *pDnode) {
dInfo("dnode run in parent process");
pDnode->ptype = DND_PROC_PARENT;
SMgmtWrapper *pDWrapper = &pDnode->wrappers[NODE_BEGIN];
SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
if (dmOpenNodeImp(pDWrapper) != 0) {
dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
return -1;
}
for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) {
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dmRequireNode(pWrapper);
if (!pWrapper->required) continue;
......@@ -233,7 +233,7 @@ static int32_t dmRunInParentProcess(SDnode *pDnode) {
return -1;
}
for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) {
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (dmRunNodeProc(pWrapper) != 0) return -1;
......@@ -254,7 +254,7 @@ static int32_t dmRunInParentProcess(SDnode *pDnode) {
dInfo("dnode is about to stop");
dmSetStatus(pDnode, DND_STAT_STOPPED);
for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) {
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_END) continue;
......@@ -269,7 +269,7 @@ static int32_t dmRunInParentProcess(SDnode *pDnode) {
}
break;
} else {
for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) {
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_END) continue;
......@@ -347,7 +347,7 @@ static int32_t dmRunInChildProcess(SDnode *pDnode) {
int32_t dmRun(SDnode *pDnode) {
if (!tsMultiProcess) {
return dmRunInSingleProcess(pDnode);
} else if (pDnode->ntype == NODE_BEGIN || pDnode->ntype == NODE_END) {
} else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
return dmRunInParentProcess(pDnode);
} else {
return dmRunInChildProcess(pDnode);
......
......@@ -55,7 +55,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
SRpcMsg req = {0};
SRpcMsg rsp;
SEpSet epset = {.inUse = 0, .numOfEps = 1};
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
tstrncpy(epset.eps[0].fqdn, pDnode->data.localFqdn, TSDB_FQDN_LEN);
epset.eps[0].port = tsServerPort;
SMgmtWrapper *pWrapper = NULL;
......@@ -172,7 +172,7 @@ void dmGetVnodeLoads(SDnode *pDnode, SMonVloadInfo *pInfo) {
SRpcMsg req = {.msgType = TDMT_MON_VM_LOAD};
SRpcMsg rsp = {0};
SEpSet epset = {.inUse = 0, .numOfEps = 1};
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
tstrncpy(epset.eps[0].fqdn, pDnode->data.localFqdn, TSDB_FQDN_LEN);
epset.eps[0].port = tsServerPort;
dmSendRecv(pDnode, &epset, &req, &rsp);
......
......@@ -18,19 +18,21 @@
static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->data.dnodeId = 0;
pDnode->data.dropped = 0;
pDnode->data.clusterId = 0;
pDnode->data.supportVnodes = pOption->numOfSupportVnodes;
pDnode->data.serverPort = pOption->serverPort;
pDnode->data.dataDir = strdup(pOption->dataDir);
pDnode->data.dnodeVer = 0;
pDnode->data.updateTime = 0;
pDnode->data.rebootTime = taosGetTimestampMs();
pDnode->data.dropped = 0;
pDnode->data.localEp = strdup(pOption->localEp);
pDnode->data.localFqdn = strdup(pOption->localFqdn);
pDnode->data.firstEp = strdup(pOption->firstEp);
pDnode->data.secondEp = strdup(pOption->secondEp);
pDnode->data.dataDir = strdup(pOption->dataDir);
pDnode->data.disks = pOption->disks;
pDnode->data.numOfDisks = pOption->numOfDisks;
pDnode->data.supportVnodes = pOption->numOfSupportVnodes;
pDnode->data.serverPort = pOption->serverPort;
pDnode->ntype = pOption->ntype;
pDnode->data.rebootTime = taosGetTimestampMs();
if (pDnode->data.dataDir == NULL || pDnode->data.localEp == NULL || pDnode->data.localFqdn == NULL ||
pDnode->data.firstEp == NULL || pDnode->data.secondEp == NULL) {
......@@ -38,7 +40,7 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
return -1;
}
if (!tsMultiProcess || pDnode->ntype == NODE_BEGIN || pDnode->ntype == NODE_END) {
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
pDnode->data.lockfile = dmCheckRunning(pDnode->data.dataDir);
if (pDnode->data.lockfile == NULL) {
return -1;
......@@ -87,7 +89,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
}
dmSetStatus(pDnode, DND_STAT_INIT);
dmSetMgmtFp(&pDnode->wrappers[NODE_BEGIN]);
dmSetMgmtFp(&pDnode->wrappers[DNODE]);
mmSetMgmtFp(&pDnode->wrappers[MNODE]);
vmSetMgmtFp(&pDnode->wrappers[VNODE]);
qmSetMgmtFp(&pDnode->wrappers[QNODE]);
......
......@@ -174,7 +174,7 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t dmInitMsgHandle(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) {
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
......
......@@ -42,7 +42,7 @@
extern "C" {
#endif
typedef enum { NODE_BEGIN, VNODE, QNODE, SNODE, MNODE, BNODE, NODE_END } EDndNodeType;
typedef enum { DNODE, VNODE, QNODE, SNODE, MNODE, BNODE, NODE_END } EDndNodeType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndRunStatus;
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EDndEnvStatus;
typedef enum { DND_PROC_SINGLE, DND_PROC_CHILD, DND_PROC_PARENT } EDndProcType;
......
......@@ -164,7 +164,7 @@ int32_t dmReadShmFile(SDnode *pDnode) {
goto _OVER;
}
for (EDndNodeType ntype = NODE_BEGIN + 1; ntype < NODE_END; ++ntype) {
for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) {
snprintf(itemName, sizeof(itemName), "%s_shmid", dmProcName(ntype));
cJSON *shmid = cJSON_GetObjectItem(root, itemName);
if (shmid && shmid->type == cJSON_Number) {
......@@ -179,8 +179,8 @@ int32_t dmReadShmFile(SDnode *pDnode) {
}
}
if (!tsMultiProcess || pDnode->ntype == NODE_BEGIN || pDnode->ntype == NODE_END) {
for (EDndNodeType ntype = NODE_BEGIN; ntype < NODE_END; ++ntype) {
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
if (pWrapper->procShm.id >= 0) {
dDebug("shmid:%d, is closed, size:%d", pWrapper->procShm.id, pWrapper->procShm.size);
......@@ -226,7 +226,7 @@ int32_t dmWriteShmFile(SDnode *pDnode) {
}
len += snprintf(content + len, MAXLEN - len, "{\n");
for (EDndNodeType ntype = NODE_BEGIN + 1; ntype < NODE_END; ++ntype) {
for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dmProcName(ntype), pWrapper->procShm.id);
if (ntype == NODE_END - 1) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册