提交 ec786b9d 编写于 作者: S Shengliang Guan

fix: alter replica one by one

上级 bbb0475d
...@@ -204,13 +204,21 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -204,13 +204,21 @@ int32_t mndInitSync(SMnode *pMnode) {
taosInitRWLatch(&pMgmt->lock); taosInitRWLatch(&pMgmt->lock);
pMgmt->transId = 0; pMgmt->transId = 0;
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg}; SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
.batchSize = 1,
.vgId = 1,
.pWal = pMnode->pWal,
.msgcb = NULL,
.FpSendMsg = mndSyncSendMsg,
.FpEqMsg = mndSyncEqMsg,
.FpEqCtrlMsg = NULL,
};
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
syncInfo.pWal = pMnode->pWal;
syncInfo.pFsm = mndSyncMakeFsm(pMnode); syncInfo.pFsm = mndSyncMakeFsm(pMnode);
syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT;
mInfo("vgId:1, start to open sync, selfIndex:%d replica:%d", pMgmt->selfIndex, pMgmt->numOfReplicas); mInfo("vgId:1, start to open sync, replica:%d selfIndex:%d", pMgmt->numOfReplicas, pMgmt->selfIndex);
SSyncCfg *pCfg = &syncInfo.syncCfg; SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = pMgmt->numOfReplicas; pCfg->replicaNum = pMgmt->numOfReplicas;
pCfg->myIndex = pMgmt->selfIndex; pCfg->myIndex = pMgmt->selfIndex;
......
...@@ -1600,6 +1600,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb ...@@ -1600,6 +1600,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1; if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1; if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1; return -1;
......
...@@ -107,7 +107,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { ...@@ -107,7 +107,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
// free info binary // free info binary
taosMemoryFree(data); taosMemoryFree(data);
vInfo("vgId:%d, vnode info is saved, fname:%s", pInfo->config.vgId, fname); vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d", pInfo->config.vgId, fname, pInfo->config.syncCfg.replicaNum);
return 0; return 0;
......
...@@ -87,12 +87,19 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { ...@@ -87,12 +87,19 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort); vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort);
} }
info.config.syncCfg = *pCfg;
ret = vnodeSaveInfo(dir, &info); ret = vnodeSaveInfo(dir, &info);
if (ret < 0) { if (ret < 0) {
vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno)); vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
return -1; return -1;
} }
ret = vnodeCommitInfo(dir, &info);
if (ret < 0) {
vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
return -1;
}
vInfo("vgId:%d, vnode config is saved", info.config.vgId); vInfo("vgId:%d, vnode config is saved", info.config.vgId);
return 0; return 0;
} }
......
...@@ -563,9 +563,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void * ...@@ -563,9 +563,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *
#endif #endif
} }
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {}
SVnode *pVnode = pFsm->data;
}
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) { static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
...@@ -605,12 +603,14 @@ static void vnodeBecomeLeader(struct SSyncFSM *pFsm) { ...@@ -605,12 +603,14 @@ static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become leader", pVnode->config.vgId); vDebug("vgId:%d, become leader", pVnode->config.vgId);
// taosThreadMutexLock(&pVnode->lock); #if 0
// if (pVnode->blocked) { taosThreadMutexLock(&pVnode->lock);
// pVnode->blocked = false; if (pVnode->blocked) {
// tsem_post(&pVnode->syncSem); pVnode->blocked = false;
// } tsem_post(&pVnode->syncSem);
// taosThreadMutexUnlock(&pVnode->lock); }
taosThreadMutexUnlock(&pVnode->lock);
#endif
} }
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
...@@ -638,10 +638,8 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -638,10 +638,8 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
SSyncInfo syncInfo = { SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_WAL_FIRST, .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
//.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT,
.batchSize = 1, .batchSize = 1,
.vgId = pVnode->config.vgId, .vgId = pVnode->config.vgId,
.isStandBy = pVnode->config.standby,
.syncCfg = pVnode->config.syncCfg, .syncCfg = pVnode->config.syncCfg,
.pWal = pVnode->pWal, .pWal = pVnode->pWal,
.msgcb = NULL, .msgcb = NULL,
...@@ -653,6 +651,13 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -653,6 +651,13 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
syncInfo.pFsm = vnodeSyncMakeFsm(pVnode); syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
SSyncCfg *pCfg = &syncInfo.syncCfg;
vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i];
vInfo("vgId:%d, index:%d ep:%s:%u", pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort);
}
pVnode->sync = syncOpen(&syncInfo); pVnode->sync = syncOpen(&syncInfo);
if (pVnode->sync <= 0) { if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr()); vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
...@@ -666,11 +671,15 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -666,11 +671,15 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
} }
void vnodeSyncStart(SVnode *pVnode) { void vnodeSyncStart(SVnode *pVnode) {
vDebug("vgId:%d, start sync", pVnode->config.vgId);
syncSetMsgCb(pVnode->sync, &pVnode->msgCb); syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync); syncStart(pVnode->sync);
} }
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } void vnodeSyncClose(SVnode *pVnode) {
vDebug("vgId:%d, close sync", pVnode->config.vgId);
syncStop(pVnode->sync);
}
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; } bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }
......
...@@ -90,7 +90,7 @@ void syncCleanUp() { ...@@ -90,7 +90,7 @@ void syncCleanUp() {
int64_t syncOpen(SSyncInfo* pSyncInfo) { int64_t syncOpen(SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sError("failed to open sync node. vgId:%d", pSyncInfo->vgId); sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
return -1; return -1;
} }
...@@ -106,7 +106,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) { ...@@ -106,7 +106,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) {
} }
void syncStart(int64_t rid) { void syncStart(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return; return;
} }
...@@ -121,7 +121,7 @@ void syncStart(int64_t rid) { ...@@ -121,7 +121,7 @@ void syncStart(int64_t rid) {
} }
void syncStartNormal(int64_t rid) { void syncStartNormal(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return; return;
} }
...@@ -131,7 +131,7 @@ void syncStartNormal(int64_t rid) { ...@@ -131,7 +131,7 @@ void syncStartNormal(int64_t rid) {
} }
void syncStartStandBy(int64_t rid) { void syncStartStandBy(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return; return;
} }
...@@ -141,7 +141,7 @@ void syncStartStandBy(int64_t rid) { ...@@ -141,7 +141,7 @@ void syncStartStandBy(int64_t rid) {
} }
void syncStop(int64_t rid) { void syncStop(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) return; if (pSyncNode == NULL) return;
int32_t vgId = pSyncNode->vgId; int32_t vgId = pSyncNode->vgId;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
...@@ -151,7 +151,7 @@ void syncStop(int64_t rid) { ...@@ -151,7 +151,7 @@ void syncStop(int64_t rid) {
} }
int32_t syncSetStandby(int64_t rid) { int32_t syncSetStandby(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("failed to set standby since accquire ref error, rid:%" PRId64, rid); sError("failed to set standby since accquire ref error, rid:%" PRId64, rid);
...@@ -1089,17 +1089,13 @@ int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { ...@@ -1089,17 +1089,13 @@ int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
return ret; return ret;
} }
// open/close -------------- SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) { SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryCalloc(1, sizeof(SSyncNode));
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
int32_t ret = 0;
if (!taosDirExist((char*)(pSyncInfo->path))) { if (!taosDirExist((char*)(pSyncInfo->path))) {
if (taosMkDir(pSyncInfo->path) != 0) { if (taosMkDir(pSyncInfo->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -1111,32 +1107,44 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) { ...@@ -1111,32 +1107,44 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP); snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
if (!taosCheckExistFile(pSyncNode->configPath)) { if (!taosCheckExistFile(pSyncNode->configPath)) {
// create a new raft config file // create a new raft config file
SRaftCfgMeta meta; SRaftCfgMeta meta = {0};
meta.isStandBy = pSyncInfo->isStandBy; meta.isStandBy = pSyncInfo->isStandBy;
meta.snapshotStrategy = pSyncInfo->snapshotStrategy; meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
meta.lastConfigIndex = SYNC_INDEX_INVALID; meta.lastConfigIndex = SYNC_INDEX_INVALID;
meta.batchSize = pSyncInfo->batchSize; meta.batchSize = pSyncInfo->batchSize;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
if (ret != 0) { sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath);
goto _error; goto _error;
} }
if (pSyncInfo->syncCfg.replicaNum == 0) { if (pSyncInfo->syncCfg.replicaNum == 0) {
sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
} }
} else { } else {
// update syncCfg by raft_config.json // update syncCfg by raft_config.json
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
if (pSyncNode->pRaftCfg == NULL) { if (pSyncNode->pRaftCfg == NULL) {
sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
goto _error; goto _error;
} }
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; if (pSyncInfo->syncCfg.replicaNum > 0 && pSyncInfo->syncCfg.replicaNum != pSyncNode->pRaftCfg->cfg.replicaNum) {
sInfo("vgId:%d, use sync config from input options", pSyncNode->vgId);
} else {
sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
}
raftCfgClose(pSyncNode->pRaftCfg); raftCfgClose(pSyncNode->pRaftCfg);
pSyncNode->pRaftCfg = NULL; pSyncNode->pRaftCfg = NULL;
} }
SSyncCfg* pCfg = &pSyncInfo->syncCfg;
sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
SNodeInfo* pNode = &pCfg->nodeInfo[i];
sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
}
// init by SSyncInfo // init by SSyncInfo
pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->vgId = pSyncInfo->vgId;
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册