提交 7f3c6042 编写于 作者: S Shengliang Guan

refactor: mnode sync

上级 907ff2e4
...@@ -29,7 +29,7 @@ extern "C" { ...@@ -29,7 +29,7 @@ extern "C" {
typedef struct SMnode SMnode; typedef struct SMnode SMnode;
typedef struct { typedef struct {
bool isStandBy; bool standby;
bool deploy; bool deploy;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
......
...@@ -53,43 +53,45 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { ...@@ -53,43 +53,45 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
*pDeployed = deployed->valueint; *pDeployed = deployed->valueint;
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
if (!mnodes || mnodes->type != cJSON_Array) { if (mnodes != NULL) {
dError("failed to read %s since nodes not found", file); if (!mnodes || mnodes->type != cJSON_Array) {
goto _OVER; dError("failed to read %s since nodes not found", file);
}
pMgmt->replica = cJSON_GetArraySize(mnodes);
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
goto _OVER;
}
for (int32_t i = 0; i < pMgmt->replica; ++i) {
cJSON *node = cJSON_GetArrayItem(mnodes, i);
if (node == NULL) break;
SReplica *pReplica = &pMgmt->replicas[i];
cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", file);
goto _OVER; goto _OVER;
} }
pReplica->id = id->valueint;
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); pMgmt->replica = cJSON_GetArraySize(mnodes);
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since fqdn not found", file); dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
goto _OVER; goto _OVER;
} }
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
cJSON *port = cJSON_GetObjectItem(node, "port"); for (int32_t i = 0; i < pMgmt->replica; ++i) {
if (!port || port->type != cJSON_Number) { cJSON *node = cJSON_GetArrayItem(mnodes, i);
dError("failed to read %s since port not found", file); if (node == NULL) break;
goto _OVER;
SReplica *pReplica = &pMgmt->replicas[i];
cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", file);
goto _OVER;
}
pReplica->id = id->valueint;
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", file);
goto _OVER;
}
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
cJSON *port = cJSON_GetObjectItem(node, "port");
if (!port || port->type != cJSON_Number) {
dError("failed to read %s since port not found", file);
goto _OVER;
}
pReplica->port = port->valueint;
} }
pReplica->port = port->valueint;
} }
code = 0; code = 0;
...@@ -122,21 +124,23 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) { ...@@ -122,21 +124,23 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
char *content = taosMemoryCalloc(1, maxLen + 1); char *content = taosMemoryCalloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica); int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica);
for (int32_t i = 0; i < replica; ++i) { if (replica > 0) {
SReplica *pReplica = &pMgmt->replicas[i]; len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
if (pMsg != NULL) { for (int32_t i = 0; i < replica; ++i) {
pReplica = &pMsg->replicas[i]; SReplica *pReplica = &pMgmt->replicas[i];
} if (pMsg != NULL) {
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); pReplica = &pMsg->replicas[i];
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); }
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
if (i < replica - 1) { len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " },{\n"); len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
} else { if (i < replica - 1) {
len += snprintf(content + len, maxLen - len, " }],\n"); len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }],\n");
}
} }
} }
......
...@@ -39,54 +39,44 @@ static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) { ...@@ -39,54 +39,44 @@ static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
} }
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) { static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) {
pOption->standby = false;
pOption->deploy = true;
pOption->msgCb = pMgmt->msgCb; pOption->msgCb = pMgmt->msgCb;
pOption->replica = 1; pOption->replica = 1;
pOption->selfIndex = 0; pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0]; SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1; pReplica->id = 1;
pReplica->port = tsServerPort; pReplica->port = tsServerPort;
tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
} }
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->msgCb = pMgmt->msgCb; pOption->msgCb = pMgmt->msgCb;
pOption->deploy = false;
pOption->standby = false;
if (pMgmt->replica > 1) { if (pMgmt->replica > 0) {
pOption->standby = true;
pOption->replica = 1; pOption->replica = 1;
pOption->selfIndex = 0; pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0]; SReplica *pReplica = &pOption->replicas[0];
for (int32_t i = 0; i < pMgmt->replica; ++i) { for (int32_t i = 0; i < pMgmt->replica; ++i) {
if (pMgmt->replicas[i].id == pMgmt->pData->dnodeId) { if (pMgmt->replicas[i].id != pMgmt->pData->dnodeId) continue;
pReplica->id = pMgmt->replicas[i].id; pReplica->id = pMgmt->replicas[i].id;
pReplica->port = pMgmt->replicas[i].port; pReplica->port = pMgmt->replicas[i].port;
memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN);
}
}
pMgmt->selfIndex = pOption->selfIndex;
pOption->isStandBy = 1;
} else {
pOption->replica = pMgmt->replica;
pOption->selfIndex = -1;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
for (int32_t i = 0; i < pOption->replica; ++i) {
if (pOption->replicas[i].id == pMgmt->pData->dnodeId) {
pOption->selfIndex = i;
}
} }
pMgmt->selfIndex = pOption->selfIndex;
} }
pOption->deploy = false;
} }
static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { static int32_t mmBuildOptionForAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
pOption->msgCb = pMgmt->msgCb; pOption->msgCb = pMgmt->msgCb;
pOption->standby = false;
pOption->deploy = false;
pOption->replica = pCreate->replica; pOption->replica = pCreate->replica;
pOption->selfIndex = -1; pOption->selfIndex = -1;
for (int32_t i = 0; i < pCreate->replica; ++i) { for (int32_t i = 0; i < pCreate->replica; ++i) {
SReplica *pReplica = &pOption->replicas[i]; SReplica *pReplica = &pOption->replicas[i];
pReplica->id = pCreate->replicas[i].id; pReplica->id = pCreate->replicas[i].id;
...@@ -101,17 +91,13 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre ...@@ -101,17 +91,13 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
dError("failed to build mnode options since %s", terrstr()); dError("failed to build mnode options since %s", terrstr());
return -1; return -1;
} }
pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
return 0; return 0;
} }
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) { int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) {
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (mmBuildOptionFromReq(pMgmt, &option, pMsg) != 0) { if (mmBuildOptionForAlter(pMgmt, &option, pMsg) != 0) {
return -1; return -1;
} }
...@@ -119,12 +105,6 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) { ...@@ -119,12 +105,6 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) {
return -1; return -1;
} }
bool deployed = true;
if (mmWriteFile(pMgmt, pMsg, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
return -1;
}
return 0; return 0;
} }
...@@ -199,7 +179,8 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -199,7 +179,8 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
} }
tmsgReportStartup("mnode-worker", "initialized"); tmsgReportStartup("mnode-worker", "initialized");
if (!deployed) { if (!deployed || pMgmt->replica > 0) {
pMgmt->replica = 0;
deployed = true; deployed = true;
if (mmWriteFile(pMgmt, NULL, deployed) != 0) { if (mmWriteFile(pMgmt, NULL, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dError("failed to write mnode file since %s", terrstr());
......
...@@ -79,7 +79,7 @@ typedef struct { ...@@ -79,7 +79,7 @@ typedef struct {
sem_t syncSem; sem_t syncSem;
int64_t sync; int64_t sync;
ESyncState state; ESyncState state;
bool isStandBy; bool standby;
bool restored; bool restored;
int32_t errCode; int32_t errCode;
} SSyncMgmt; } SSyncMgmt;
......
...@@ -120,8 +120,8 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -120,8 +120,8 @@ int32_t mndInitSync(SMnode *pMnode) {
SSyncCfg *pCfg = &syncInfo.syncCfg; SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = pMnode->replica; pCfg->replicaNum = pMnode->replica;
pCfg->myIndex = pMnode->selfIndex; pCfg->myIndex = pMnode->selfIndex;
mInfo("start to open mnode sync, replica:%d myIndex:%d standBy:%d", pCfg->replicaNum, pCfg->myIndex, mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex,
pMgmt->isStandBy); pMgmt->standby);
for (int32_t i = 0; i < pMnode->replica; ++i) { for (int32_t i = 0; i < pMnode->replica; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i]; SNodeInfo *pNode = &pCfg->nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
...@@ -182,7 +182,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { ...@@ -182,7 +182,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
void mndSyncStart(SMnode *pMnode) { void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
if (pMgmt->isStandBy) { if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync); syncStartStandBy(pMgmt->sync);
} else { } else {
syncStart(pMgmt->sync); syncStart(pMgmt->sync);
...@@ -201,7 +201,7 @@ bool mndIsMaster(SMnode *pMnode) { ...@@ -201,7 +201,7 @@ bool mndIsMaster(SMnode *pMnode) {
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
SSyncCfg cfg = {.replicaNum = pOption->replica, .myIndex = pOption->selfIndex}; SSyncCfg cfg = {.replicaNum = pOption->replica, .myIndex = pOption->selfIndex};
mInfo("start to alter mnode sync, replica:%d myIndex:%d standBy:%d", cfg.replicaNum, cfg.myIndex, pOption->isStandBy); mInfo("start to alter mnode sync, replica:%d myindex:%d standby:%d", cfg.replicaNum, cfg.myIndex, pOption->standby);
for (int32_t i = 0; i < pOption->replica; ++i) { for (int32_t i = 0; i < pOption->replica; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i]; SNodeInfo *pNode = &cfg.nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pOption->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pOption->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
...@@ -210,6 +210,6 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -210,6 +210,6 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
} }
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->isStandBy = pOption->isStandBy; pMgmt->standby = pOption->standby;
return syncReconfig(pMgmt->sync, &cfg); return syncReconfig(pMgmt->sync, &cfg);
} }
\ No newline at end of file
...@@ -263,7 +263,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -263,7 +263,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->msgCb = pOption->msgCb; pMnode->msgCb = pOption->msgCb;
pMnode->selfId = pOption->replicas[pOption->selfIndex].id; pMnode->selfId = pOption->replicas[pOption->selfIndex].id;
pMnode->syncMgmt.isStandBy = pOption->isStandBy; pMnode->syncMgmt.standby = pOption->standby;
} }
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
......
...@@ -55,7 +55,8 @@ ...@@ -55,7 +55,8 @@
./test.sh -f tsim/bnode/basic1.sim ./test.sh -f tsim/bnode/basic1.sim
# ---- mnode # ---- mnode
./test.sh -f tsim/mnode/basic1.sim #./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
# ---- show # ---- show
./test.sh -f tsim/show/basic.sim ./test.sh -f tsim/show/basic.sim
...@@ -104,7 +105,7 @@ ...@@ -104,7 +105,7 @@
./test.sh -f tsim/tmq/basic3.sim -m ./test.sh -f tsim/tmq/basic3.sim -m
./test.sh -f tsim/stable/vnode3.sim -m ./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/qnode/basic1.sim -m ./test.sh -f tsim/qnode/basic1.sim -m
./test.sh -f tsim/mnode/basic1.sim -m #./test.sh -f tsim/mnode/basic1.sim -m
# --- sma # --- sma
./test.sh -f tsim/sma/tsmaCreateInsertData.sim ./test.sh -f tsim/sma/tsmaCreateInsertData.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册