提交 70b513c0 编写于 作者: S Shengliang Guan

fix: vnode set the wrong replica info after snapshot transfered

上级 650ea06b
......@@ -132,7 +132,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->syncCfg.myIndex = pCreate->selfIndex;
pCfg->syncCfg.replicaNum = pCreate->replica;
memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
for (int i = 0; i < pCreate->replica; ++i) {
for (int32_t i = 0; i < pCreate->replica; ++i) {
SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
pNode->nodeId = pCreate->replicas[i].id;
pNode->nodePort = pCreate->replicas[i].port;
......@@ -288,7 +288,8 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", alterReq.vgId, alterReq.replica,
alterReq.selfIndex, alterReq.strict);
for (int32_t i = 0; i < alterReq.replica; ++i) {
dInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
SReplica *pReplica = &alterReq.replicas[i];
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", alterReq.vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
}
if (alterReq.replica <= 0 || alterReq.selfIndex < 0 || alterReq.selfIndex >= alterReq.replica) {
......
......@@ -128,14 +128,19 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
SJson *nodeInfo = tjsonCreateArray();
if (nodeInfo == NULL) return -1;
if (tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", nodeInfo) < 0) return -1;
vDebug("vgId:%d, encode config, replicas:%d selfIndex:%d", pCfg->vgId, pCfg->syncCfg.replicaNum,
pCfg->syncCfg.myIndex);
for (int i = 0; i < pCfg->syncCfg.replicaNum; ++i) {
SJson *info = tjsonCreateObject();
SJson *info = tjsonCreateObject();
SNodeInfo *pNode = (SNodeInfo *)&pCfg->syncCfg.nodeInfo[i];
if (info == NULL) return -1;
if (tjsonAddIntegerToObject(info, "nodePort", pCfg->syncCfg.nodeInfo[i].nodePort) < 0) return -1;
if (tjsonAddStringToObject(info, "nodeFqdn", pCfg->syncCfg.nodeInfo[i].nodeFqdn) < 0) return -1;
if (tjsonAddIntegerToObject(info, "nodeId", pCfg->syncCfg.nodeInfo[i].nodeId) < 0) return -1;
if (tjsonAddIntegerToObject(info, "clusterId", pCfg->syncCfg.nodeInfo[i].clusterId) < 0) return -1;
if (tjsonAddIntegerToObject(info, "nodePort", pNode->nodePort) < 0) return -1;
if (tjsonAddStringToObject(info, "nodeFqdn", pNode->nodeFqdn) < 0) return -1;
if (tjsonAddIntegerToObject(info, "nodeId", pNode->nodeId) < 0) return -1;
if (tjsonAddIntegerToObject(info, "clusterId", pNode->clusterId) < 0) return -1;
if (tjsonAddItemToArray(nodeInfo, info) < 0) return -1;
vDebug("vgId:%d, encode config, replica:%d ep:%s:%u dnode:%d", pCfg->vgId, i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId);
}
return 0;
......@@ -248,15 +253,20 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
int arraySize = tjsonGetArraySize(nodeInfo);
if (arraySize != pCfg->syncCfg.replicaNum) return -1;
vDebug("vgId:%d, decode config, replicas:%d selfIndex:%d", pCfg->vgId, pCfg->syncCfg.replicaNum,
pCfg->syncCfg.myIndex);
for (int i = 0; i < arraySize; ++i) {
SJson *info = tjsonGetArrayItem(nodeInfo, i);
SJson *info = tjsonGetArrayItem(nodeInfo, i);
SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
if (info == NULL) return -1;
tjsonGetNumberValue(info, "nodePort", pCfg->syncCfg.nodeInfo[i].nodePort, code);
tjsonGetNumberValue(info, "nodePort", pNode->nodePort, code);
if (code < 0) return -1;
tjsonGetStringValue(info, "nodeFqdn", pCfg->syncCfg.nodeInfo[i].nodeFqdn);
tjsonGetStringValue(info, "nodeFqdn", pNode->nodeFqdn);
if (code < 0) return -1;
tjsonGetNumberValue(info, "nodeId", pCfg->syncCfg.nodeInfo[i].nodeId, code);
tjsonGetNumberValue(info, "clusterId", pCfg->syncCfg.nodeInfo[i].clusterId, code);
tjsonGetNumberValue(info, "nodeId", pNode->nodeId, code);
tjsonGetNumberValue(info, "clusterId", pNode->clusterId, code);
vDebug("vgId:%d, decode config, replica:%d ep:%s:%u dnode:%d", pCfg->vgId, i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId);
}
tjsonGetNumberValue(pJson, "tsdbPageSize", pCfg->tsdbPageSize, code);
......
......@@ -105,8 +105,8 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
// free info binary
taosMemoryFree(data);
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d", pInfo->config.vgId, fname,
pInfo->config.syncCfg.replicaNum);
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d", pInfo->config.vgId, fname,
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex);
return 0;
......@@ -206,6 +206,8 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
......
......@@ -48,6 +48,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
info.state.applied = -1;
info.state.commitID = 0;
vInfo("vgId:%d, save config while create", pCfg->vgId);
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno));
return -1;
......@@ -79,14 +80,14 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
pCfg->replicaNum = pReq->replica;
memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
vInfo("vgId:%d, save config, replicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->myIndex);
vInfo("vgId:%d, save config while alter, replicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->myIndex);
for (int i = 0; i < pReq->replica; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i];
pNode->nodeId = pReq->replicas[i].id;
pNode->nodePort = pReq->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort);
vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
}
info.config.syncCfg = *pCfg;
......
......@@ -405,6 +405,10 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
}
SVnode *pVnode = pWriter->pVnode;
pWriter->info.config = pVnode->config;
vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);
if (vnodeSaveInfo(dir, &pWriter->info) < 0) {
code = terrno;
goto _exit;
......
......@@ -851,6 +851,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
if (!taosCheckExistFile(pSyncNode->configPath)) {
// create a new raft config file
sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
pSyncNode->raftCfg.lastConfigIndex = SYNC_INDEX_INVALID;
......@@ -894,7 +895,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pNode->nodeId, pNode->clusterId);
}
pSyncNode->pWal = pSyncInfo->pWal;
pSyncNode->msgcb = pSyncInfo->msgcb;
pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
......@@ -1656,7 +1656,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// persist cfg
syncWriteCfgFile(pSyncNode);
// change isStandBy to normal (election timeout)
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, "");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册