提交 01521a25 编写于 作者: S Shengliang Guan

refactor: mnode file

上级 877cb761
......@@ -34,16 +34,14 @@ typedef struct SMnodeMgmt {
SSingleWorker writeWorker;
SSingleWorker syncWorker;
SSingleWorker monitorWorker;
SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica;
bool stopped;
int32_t refCount;
TdThreadRwlock lock;
} SMnodeMgmt;
// mmFile.c
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed);
// mmInt.c
int32_t mmAcquire(SMnodeMgmt *pMgmt);
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) {
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
int32_t len = 0;
int32_t maxLen = 4096;
......@@ -52,45 +52,36 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
}
*pDeployed = deployed->valueint;
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
if (mnodes != NULL) {
if (!mnodes || mnodes->type != cJSON_Array) {
dError("failed to read %s since nodes not found", file);
cJSON *id = cJSON_GetObjectItem(root, "id");
if (id) {
if (id->type != cJSON_Number) {
dError("failed to read %s since id not found", file);
goto _OVER;
}
if (pReplica) {
pReplica->id = id->valueint;
}
}
pMgmt->replica = cJSON_GetArraySize(mnodes);
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
cJSON *fqdn = cJSON_GetObjectItem(root, "fqdn");
if (fqdn) {
if (fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", file);
goto _OVER;
}
for (int32_t i = 0; i < pMgmt->replica; ++i) {
cJSON *node = cJSON_GetArrayItem(mnodes, i);
if (node == NULL) break;
SReplica *pReplica = &pMgmt->replicas[i];
cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", file);
goto _OVER;
}
pReplica->id = id->valueint;
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", file);
goto _OVER;
}
if (pReplica) {
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
}
}
cJSON *port = cJSON_GetObjectItem(node, "port");
if (!port || port->type != cJSON_Number) {
dError("failed to read %s since port not found", file);
goto _OVER;
}
pReplica->port = port->valueint;
cJSON *port = cJSON_GetObjectItem(root, "port");
if (port) {
if (port->type != cJSON_Number) {
dError("failed to read %s since port not found", file);
goto _OVER;
}
if (pReplica) {
pReplica->port = (uint16_t)port->valueint;
}
}
......@@ -106,7 +97,7 @@ _OVER:
return code;
}
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed) {
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
......@@ -124,26 +115,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
char *content = taosMemoryCalloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica);
if (replica > 0) {
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
if (pMsg != NULL) {
pReplica = &pMsg->replicas[i];
}
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
if (i < replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }],\n");
}
}
if (pReplica != NULL && pReplica->id > 0) {
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n,", pReplica->port);
}
len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed);
len += snprintf(content + len, maxLen - len, "}\n");
......
......@@ -90,7 +90,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SMnodeMgmt mgmt = {0};
mgmt.path = pInput->path;
mgmt.name = pInput->name;
if (mmWriteFile(&mgmt, &createReq, deployed) != 0) {
if (mmWriteFile(&mgmt, &createReq.replicas[0], deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
return -1;
}
......
......@@ -27,7 +27,7 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) {
static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
SMnodeMgmt mgmt = {0};
mgmt.path = pInput->path;
if (mmReadFile(&mgmt, required) != 0) {
if (mmReadFile(&mgmt, NULL, required) != 0) {
return -1;
}
......@@ -52,23 +52,17 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
}
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, const SReplica *pReplica, SMnodeOpt *pOption) {
pOption->standby = false;
pOption->deploy = false;
pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId;
if (pMgmt->replica > 0) {
if (pReplica->id > 0) {
pOption->standby = true;
pOption->replica = 1;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
for (int32_t i = 0; i < pMgmt->replica; ++i) {
if (pMgmt->replicas[i].id != pMgmt->pData->dnodeId) continue;
pReplica->id = pMgmt->replicas[i].id;
pReplica->port = pMgmt->replicas[i].port;
memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN);
}
memcpy(&pOption->replicas[0], pReplica, sizeof(SReplica));
}
}
......@@ -108,8 +102,9 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL);
bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) {
bool deployed = false;
SReplica replica = {0};
if (mmReadFile(pMgmt, &replica, &deployed) != 0) {
dError("failed to read file since %s", terrstr());
mmClose(pMgmt);
return -1;
......@@ -122,7 +117,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
mmBuildOptionForDeploy(pMgmt, pInput, &option);
} else {
dInfo("mnode start to open");
mmBuildOptionForOpen(pMgmt, &option);
mmBuildOptionForOpen(pMgmt, &replica, &option);
}
pMgmt->pMnode = mndOpen(pMgmt->path, &option);
......@@ -140,8 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
}
tmsgReportStartup("mnode-worker", "initialized");
if (!deployed || pMgmt->replica > 0) {
pMgmt->replica = 0;
if (!deployed || replica.id > 0) {
deployed = true;
if (mmWriteFile(pMgmt, NULL, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
......
......@@ -123,10 +123,12 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
} else {
dInfo("node:%s, has been created", pWrapper->name);
(void)dmOpenNode(pWrapper);
(void)dmStartNode(pWrapper);
pWrapper->required = true;
code = dmOpenNode(pWrapper);
if (code != 0) {
code = dmStartNode(pWrapper);
}
pWrapper->deployed = true;
pWrapper->required = true;
pWrapper->proc.ptype = pDnode->ptype;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册