From ec786b9d78669ddde356e76c8f5bdba995feb14d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 24 Oct 2022 11:57:26 +0800 Subject: [PATCH] fix: alter replica one by one --- source/dnode/mnode/impl/src/mndSync.c | 16 ++++++--- source/dnode/mnode/impl/src/mndVgroup.c | 1 + source/dnode/vnode/src/vnd/vnodeCommit.c | 2 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 7 ++++ source/dnode/vnode/src/vnd/vnodeSync.c | 33 +++++++++++------- source/libs/sync/src/syncMain.c | 44 ++++++++++++++---------- 6 files changed, 68 insertions(+), 35 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index cd6fe380e1..df4b526775 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -204,13 +204,21 @@ int32_t mndInitSync(SMnode *pMnode) { taosInitRWLatch(&pMgmt->lock); pMgmt->transId = 0; - SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg}; + SSyncInfo syncInfo = { + .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT, + .batchSize = 1, + .vgId = 1, + .pWal = pMnode->pWal, + .msgcb = NULL, + .FpSendMsg = mndSyncSendMsg, + .FpEqMsg = mndSyncEqMsg, + .FpEqCtrlMsg = NULL, + }; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); - syncInfo.pWal = pMnode->pWal; syncInfo.pFsm = mndSyncMakeFsm(pMnode); - syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT; - mInfo("vgId:1, start to open sync, selfIndex:%d replica:%d", pMgmt->selfIndex, pMgmt->numOfReplicas); + mInfo("vgId:1, start to open sync, replica:%d selfIndex:%d", pMgmt->numOfReplicas, pMgmt->selfIndex); SSyncCfg *pCfg = &syncInfo.syncCfg; pCfg->replicaNum = pMgmt->numOfReplicas; pCfg->myIndex = pMgmt->selfIndex; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 9bd89c1983..3f2c6ac2a8 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1600,6 +1600,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1; if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1; + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1; if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 63a576f1a2..e593bbd602 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -107,7 +107,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { // free info binary taosMemoryFree(data); - vInfo("vgId:%d, vnode info is saved, fname:%s", pInfo->config.vgId, fname); + vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d", pInfo->config.vgId, fname, pInfo->config.syncCfg.replicaNum); return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 57fdb77533..0696ec0901 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -87,12 +87,19 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort); } + info.config.syncCfg = *pCfg; ret = vnodeSaveInfo(dir, &info); if (ret < 0) { vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno)); return -1; } + ret = vnodeCommitInfo(dir, &info); + if (ret < 0) { + vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno)); + return -1; + } + vInfo("vgId:%d, vnode config is saved", info.config.vgId); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 1863203f4a..d3ae1015d0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -563,9 +563,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void * #endif } -static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SVnode *pVnode = pFsm->data; -} +static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {} static void vnodeRestoreFinish(struct SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; @@ -605,12 +603,14 @@ static void vnodeBecomeLeader(struct SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; vDebug("vgId:%d, become leader", pVnode->config.vgId); - // taosThreadMutexLock(&pVnode->lock); - // if (pVnode->blocked) { - // pVnode->blocked = false; - // tsem_post(&pVnode->syncSem); - // } - // taosThreadMutexUnlock(&pVnode->lock); +#if 0 + taosThreadMutexLock(&pVnode->lock); + if (pVnode->blocked) { + pVnode->blocked = false; + tsem_post(&pVnode->syncSem); + } + taosThreadMutexUnlock(&pVnode->lock); +#endif } static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { @@ -638,10 +638,8 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { SSyncInfo syncInfo = { .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST, - //.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT, .batchSize = 1, .vgId = pVnode->config.vgId, - .isStandBy = pVnode->config.standby, .syncCfg = pVnode->config.syncCfg, .pWal = pVnode->pWal, .msgcb = NULL, @@ -653,6 +651,13 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); syncInfo.pFsm = vnodeSyncMakeFsm(pVnode); + SSyncCfg *pCfg = &syncInfo.syncCfg; + vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex); + for (int32_t i = 0; i < pCfg->replicaNum; ++i) { + SNodeInfo *pNode = &pCfg->nodeInfo[i]; + vInfo("vgId:%d, index:%d ep:%s:%u", pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort); + } + pVnode->sync = syncOpen(&syncInfo); if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr()); @@ -666,11 +671,15 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { } void vnodeSyncStart(SVnode *pVnode) { + vDebug("vgId:%d, start sync", pVnode->config.vgId); syncSetMsgCb(pVnode->sync, &pVnode->msgCb); syncStart(pVnode->sync); } -void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } +void vnodeSyncClose(SVnode *pVnode) { + vDebug("vgId:%d, close sync", pVnode->config.vgId); + syncStop(pVnode->sync); +} bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 00c4ea76aa..b350f0925e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -90,7 +90,7 @@ void syncCleanUp() { int64_t syncOpen(SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); if (pSyncNode == NULL) { - sError("failed to open sync node. vgId:%d", pSyncInfo->vgId); + sError("vgId:%d, failed to open sync node", pSyncInfo->vgId); return -1; } @@ -106,7 +106,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) { } void syncStart(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return; } @@ -121,7 +121,7 @@ void syncStart(int64_t rid) { } void syncStartNormal(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return; } @@ -131,7 +131,7 @@ void syncStartNormal(int64_t rid) { } void syncStartStandBy(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return; } @@ -141,7 +141,7 @@ void syncStartStandBy(int64_t rid) { } void syncStop(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) return; int32_t vgId = pSyncNode->vgId; taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -151,7 +151,7 @@ void syncStop(int64_t rid) { } int32_t syncSetStandby(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("failed to set standby since accquire ref error, rid:%" PRId64, rid); @@ -1089,17 +1089,13 @@ int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { return ret; } -// open/close -------------- -SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) { - SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; - - SSyncNode* pSyncNode = (SSyncNode*)taosMemoryCalloc(1, sizeof(SSyncNode)); +SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { + SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode)); if (pSyncNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _error; } - int32_t ret = 0; if (!taosDirExist((char*)(pSyncInfo->path))) { if (taosMkDir(pSyncInfo->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -1111,32 +1107,44 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) { snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP); if (!taosCheckExistFile(pSyncNode->configPath)) { // create a new raft config file - SRaftCfgMeta meta; + SRaftCfgMeta meta = {0}; meta.isStandBy = pSyncInfo->isStandBy; meta.snapshotStrategy = pSyncInfo->snapshotStrategy; meta.lastConfigIndex = SYNC_INDEX_INVALID; meta.batchSize = pSyncInfo->batchSize; - ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); - if (ret != 0) { - sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath); + if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) { + sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath); goto _error; } if (pSyncInfo->syncCfg.replicaNum == 0) { + sInfo("vgId:%d, sync config not input", pSyncNode->vgId); pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; } } else { // update syncCfg by raft_config.json pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); if (pSyncNode->pRaftCfg == NULL) { - sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); + sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath); goto _error; } - pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; + if (pSyncInfo->syncCfg.replicaNum > 0 && pSyncInfo->syncCfg.replicaNum != pSyncNode->pRaftCfg->cfg.replicaNum) { + sInfo("vgId:%d, use sync config from input options", pSyncNode->vgId); + } else { + sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId); + pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; + } raftCfgClose(pSyncNode->pRaftCfg); pSyncNode->pRaftCfg = NULL; } + SSyncCfg* pCfg = &pSyncInfo->syncCfg; + sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex); + for (int32_t i = 0; i < pCfg->replicaNum; ++i) { + SNodeInfo* pNode = &pCfg->nodeInfo[i]; + sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort); + } + // init by SSyncInfo pSyncNode->vgId = pSyncInfo->vgId; memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); -- GitLab