diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index 63f24b104fe47615750e6363207d93d19e9400b7..0a6e2428fee2b073aa35c035ca3e4cb312c3aa25 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -35,6 +35,7 @@ typedef struct SSyncIndexMgr { } SSyncIndexMgr; SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode); +void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode); void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 34fc7d4eb4aa5d697d44b33716befefcf78c5747..1a5d418e7545122b48b15e1b83c4a2bef3d9a860 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -342,6 +342,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ASSERT(ret == 0); syncNodeUpdateConfig(ths, &newSyncCfg); + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(ths); + } else { + syncNodeBecomeFollower(ths); + } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 5d7b5c1b21615dd47b16feee65895ff300976178..0f17cf267e8e8a54d6020f12b47bafb594c92434 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -120,6 +120,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ASSERT(ret == 0); syncNodeUpdateConfig(pSyncNode, &newSyncCfg); + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(pSyncNode); + } else { + syncNodeBecomeFollower(pSyncNode); + } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index d33075054a888a1b2903fe230f6503e6c19aa580..5809cedb9038758744d20b8e6ee2270bd0720e47 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -31,6 +31,13 @@ SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { return pSyncIndexMgr; } +void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) { + pSyncIndexMgr->replicas = &(pSyncNode->replicasId); + pSyncIndexMgr->replicaNum = pSyncNode->replicaNum; + pSyncIndexMgr->pSyncNode = pSyncNode; + syncIndexMgrClear(pSyncIndexMgr); +} + void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) { if (pSyncIndexMgr != NULL) { taosMemoryFree(pSyncIndexMgr); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 34313baaa97be1b1165f0d1e9dbfb6aae9a31546..469682a7b5e72fc180d8a2417e02a11e8e0bf381 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -905,6 +905,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); } + + syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); + syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); + + syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); } SSyncNode* syncNodeAcquire(int64_t rid) { diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 4455d25c59e6829a0e628c86e146e0f4a7384aea..9a2d9a6b3461f46c6af744b2fa503ca9ce46a6b8 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -220,7 +220,7 @@ int main(int argc, char** argv) { assert(pSyncNode != NULL); if (isConfigChange) { - configChange(rid, replicaNum, myIndex); + configChange(rid, 3, myIndex); } //---------------------------