From 2521994923d9fe1e4c514a310dbf25653f89edf3 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 15 Nov 2021 11:58:12 +0800 Subject: [PATCH] [TD-10645][raft]refactor sync interface --- .../libs/sync/inc/sync_raft_config_change.h | 2 +- source/libs/sync/inc/sync_raft_restore.h | 2 +- source/libs/sync/src/raft.c | 22 +++++++++++++-- .../libs/sync/src/sync_raft_config_change.c | 28 +++++++++++++------ source/libs/sync/src/sync_raft_restore.c | 4 +-- 5 files changed, 43 insertions(+), 15 deletions(-) diff --git a/source/libs/sync/inc/sync_raft_config_change.h b/source/libs/sync/inc/sync_raft_config_change.h index 98b8b49cb8..a54a7544fe 100644 --- a/source/libs/sync/inc/sync_raft_config_change.h +++ b/source/libs/sync/inc/sync_raft_config_change.h @@ -36,7 +36,7 @@ typedef int (*configChangeFp)(SSyncRaftChanger* changer, const SSyncConfChangeSi int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); -int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, +int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const SSyncConfChangeSingleArray* css, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); #endif /* TD_SYNC_RAFT_CONFIG_CHANGE_H */ diff --git a/source/libs/sync/inc/sync_raft_restore.h b/source/libs/sync/inc/sync_raft_restore.h index f7c4ce67b5..38eadb00c7 100644 --- a/source/libs/sync/inc/sync_raft_restore.h +++ b/source/libs/sync/inc/sync_raft_restore.h @@ -19,7 +19,7 @@ #include "sync_type.h" #include "sync_raft_proto.h" -// Restore takes a Changer (which must represent an empty configuration), and +// syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and // runs a sequence of changes enacting the configuration described in the // ConfState. // diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index cdf2abebea..d39824c99c 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -22,6 +22,9 @@ #define RAFT_READ_LOG_MAX_NUM 100 +static int deserializeServerStateFromBuffer(SSyncServerState* server, const char* buffer, int n); +static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n); + static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); @@ -35,7 +38,9 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { SyncIndex initIndex = pInfo->snapshotIndex; SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM]; int nBuf, limit, i; - + char* buf; + int n; + memset(pRaft, 0, sizeof(SSyncRaft)); memcpy(&pRaft->fsm, &pInfo->fsm, sizeof(SSyncFSM)); @@ -57,10 +62,15 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { return -1; } // read server state - if (stateManager->readServerState(stateManager, &serverState) != 0) { + if (stateManager->readServerState(stateManager, &buf, &n) != 0) { syncError("readServerState for vgid %d fail", pInfo->vgId); return -1; } + if (deserializeServerStateFromBuffer(&serverState, buf, n) != 0) { + syncError("deserializeServerStateFromBuffer for vgid %d fail", pInfo->vgId); + return -1; + } + assert(initIndex <= serverState.commitIndex); // restore fsm state from snapshot index + 1 until commitIndex @@ -119,6 +129,14 @@ int32_t syncRaftTick(SSyncRaft* pRaft) { return 0; } +static int deserializeServerStateFromBuffer(SSyncServerState* server, const char* buffer, int n) { + return 0; +} + +static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n) { + return 0; +} + /** * pre-handle message, return true means no need to continue * Handle the message term, which may result in our stepping down to a follower. diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index 7bf409fba0..b80562ffa3 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -17,6 +17,7 @@ #include "sync_raft_config_change.h" #include "sync_raft_progress.h" #include "sync_raft_progress_tracker.h" +#include "sync_raft_quorum_joint.h" static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); @@ -38,8 +39,8 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* SSyncRaftProgressMap* progressMap, SyncNodeId id); static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id); -static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, - SSyncRaftProgressMap* progressMap, SyncNodeId id); +static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id); // syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate) // mutates the incoming majority config Voters[0] by at most one. This method // will return an error if that is not the case, if the resulting quorum is @@ -87,7 +88,7 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange // (Section 4.3) corresponds to `C_{new,old}`. // // [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf -int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, +int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const SSyncConfChangeSingleArray* css, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { int ret; @@ -108,9 +109,18 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSi } // Clear the outgoing config. - syncRaftJointConfigClearOutgoing(config); + syncRaftJointConfigClearOutgoing(&config->voters); // Copy incoming to outgoing. + memcpy(&config->voters.outgoing, &config->voters.incoming, sizeof(SSyncCluster)); + + ret = applyConfig(changer, config, progressMap, css); + if (ret != 0) { + return ret; + } + + config->autoLeave = autoLeave; + return checkAndReturn(config, progressMap); } // checkAndCopy copies the tracker's config and progress map (deeply enough for @@ -210,7 +220,7 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig makeLearner(changer, config, progressMap, cs->nodeId); break; case SYNC_RAFT_Conf_RemoveNode: - remove(changer, config, progressMap, cs->nodeId); + removeNodeId(changer, config, progressMap, cs->nodeId); break; case SYNC_RAFT_Conf_UpdateNode: break; @@ -309,7 +319,7 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* progress->isLearner = false; nilAwareDelete(&config->learners, id); nilAwareDelete(&config->learnersNext, id); - syncRaftJointConfigAddToIncoming(config, id); + syncRaftJointConfigAddToIncoming(&config->voters, id); } // makeLearner makes the given ID a learner or stages it to be a learner once @@ -339,7 +349,7 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi return; } // Remove any existing voter in the incoming config... - remove(changer, config, progressMap, id); + removeNodeId(changer, config, progressMap, id); // ... but save the Progress. syncRaftAddToProgressMap(progressMap, id); @@ -358,8 +368,8 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi } } -// remove this peer as a voter or learner from the incoming config. -static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, +// removeNodeId this peer as a voter or learner from the incoming config. +static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id) { int i = syncRaftFindProgressIndexByNodeId(progressMap, id); if (i == -1) { diff --git a/source/libs/sync/src/sync_raft_restore.c b/source/libs/sync/src/sync_raft_restore.c index b7d9cc6888..d2bdbd6351 100644 --- a/source/libs/sync/src/sync_raft_restore.c +++ b/source/libs/sync/src/sync_raft_restore.c @@ -19,7 +19,7 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in); -// Restore takes a Changer (which must represent an empty configuration), and +// syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and // runs a sequence of changes enacting the configuration described in the // ConfState. // @@ -70,7 +70,7 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) } } - ret = syncRaftChangerEnterJoint(changer, &incoming, config, progressMap); + ret = syncRaftChangerEnterJoint(changer, cs->autoLeave, &incoming, config, progressMap); if (ret != 0) { goto out; } -- GitLab