From 956ab45a3043bd0c06a4aacbece898256d4eb8ab Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 16 May 2022 14:10:18 +0800 Subject: [PATCH] enh(sync): raft config change --- source/libs/sync/inc/syncIndexMgr.h | 1 + source/libs/sync/src/syncAppendEntries.c | 5 +++++ source/libs/sync/src/syncCommit.c | 5 +++++ source/libs/sync/src/syncIndexMgr.c | 7 +++++++ source/libs/sync/src/syncMain.c | 5 +++++ source/libs/sync/test/syncConfigChangeTest.cpp | 2 +- 6 files changed, 24 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index 63f24b104f..0a6e2428fe 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -35,6 +35,7 @@ typedef struct SSyncIndexMgr { } SSyncIndexMgr; SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode); +void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode); void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 34fc7d4eb4..1a5d418e75 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -342,6 +342,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ASSERT(ret == 0); syncNodeUpdateConfig(ths, &newSyncCfg); + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(ths); + } else { + syncNodeBecomeFollower(ths); + } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 5d7b5c1b21..0f17cf267e 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -120,6 +120,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ASSERT(ret == 0); syncNodeUpdateConfig(pSyncNode, &newSyncCfg); + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(pSyncNode); + } else { + syncNodeBecomeFollower(pSyncNode); + } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index d33075054a..5809cedb90 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -31,6 +31,13 @@ SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { return pSyncIndexMgr; } +void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) { + pSyncIndexMgr->replicas = &(pSyncNode->replicasId); + pSyncIndexMgr->replicaNum = pSyncNode->replicaNum; + pSyncIndexMgr->pSyncNode = pSyncNode; + syncIndexMgrClear(pSyncIndexMgr); +} + void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) { if (pSyncIndexMgr != NULL) { taosMemoryFree(pSyncIndexMgr); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 34313baaa9..469682a7b5 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -905,6 +905,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); } + + syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); + syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); + + syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); } SSyncNode* syncNodeAcquire(int64_t rid) { diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 4455d25c59..9a2d9a6b34 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -220,7 +220,7 @@ int main(int argc, char** argv) { assert(pSyncNode != NULL); if (isConfigChange) { - configChange(rid, replicaNum, myIndex); + configChange(rid, 3, myIndex); } //--------------------------- -- GitLab