diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 5b6efb95e5f0eb53d536cddb13e408f1dac404d0..129f0f4dbccaeeb52458b7bcd5560168edb14c12 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -18,6 +18,7 @@ #include "sync.h" #include "sync_type.h" +#include "thash.h" #include "raft_message.h" #include "sync_raft_impl.h" #include "sync_raft_quorum.h" @@ -43,9 +44,9 @@ struct SSyncRaft { // owner sync node SSyncNode* pNode; - SSyncCluster cluster; + // hash map nodeId -> SNodeInfo* + SHashObj* nodeInfoMap; - int selfIndex; SyncNodeId selfId; SyncGroupId selfGroupId; diff --git a/source/libs/sync/inc/raft_configuration.h b/source/libs/sync/inc/raft_configuration.h deleted file mode 100644 index ac9bbb5e55d89f0f1c4952dad15c1c900828b631..0000000000000000000000000000000000000000 --- a/source/libs/sync/inc/raft_configuration.h +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_SYNC_RAFT_CONFIGURATION_H -#define _TD_LIBS_SYNC_RAFT_CONFIGURATION_H - -#include "sync.h" -#include "sync_type.h" - -// return -1 if cannot find this id -int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id); - -int syncRaftConfigurationVoterCount(SSyncRaft *pRaft); - -#endif /* _TD_LIBS_SYNC_RAFT_CONFIGURATION_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_log.h b/source/libs/sync/inc/raft_log.h index dc10c59b284474888d1adbe86a13ceefea81bdf0..117ed42c2cd4cd6655b7dedd64128763c9f05025 100644 --- a/source/libs/sync/inc/raft_log.h +++ b/source/libs/sync/inc/raft_log.h @@ -39,8 +39,6 @@ struct SSyncRaftLog { SyncIndex commitIndex; SyncIndex appliedIndex; - - }; SSyncRaftLog* syncRaftLogOpen(); diff --git a/source/libs/sync/inc/raft_replication.h b/source/libs/sync/inc/raft_replication.h index d0e55ef10ef3292468fcdcce7e89515f5f48956b..180a2db61f8a553d6807dc4a52cd3b397fbcb067 100644 --- a/source/libs/sync/inc/raft_replication.h +++ b/source/libs/sync/inc/raft_replication.h @@ -20,11 +20,11 @@ #include "syncInt.h" #include "sync_type.h" -// syncRaftReplicate sends an append RPC with new entries to the given peer, +// syncRaftMaybeSendAppend sends an append RPC with new entries to the given peer, // if necessary. Returns true if a message was sent. The sendIfEmpty // argument controls whether messages with no entries will be sent // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). -bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty); +bool syncRaftMaybeSendAppend(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty); #endif /* TD_SYNC_RAFT_REPLICATION_H */ diff --git a/source/libs/sync/src/raft_configuration.c b/source/libs/sync/inc/sync_const.h similarity index 72% rename from source/libs/sync/src/raft_configuration.c rename to source/libs/sync/inc/sync_const.h index e16cb349892a78fef6ac58f1fb3d0bc9ecd66064..b49c17f82e33afe50e5d4175492d94c1628c8eee 100644 --- a/source/libs/sync/src/raft_configuration.c +++ b/source/libs/sync/inc/sync_const.h @@ -13,13 +13,13 @@ * along with this program. If not, see . */ -#include "raft_configuration.h" -#include "raft.h" +#ifndef _TD_LIBS_SYNC_CONST_H +#define _TD_LIBS_SYNC_CONST_H -int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id) { - return (int)(id); -} +#include "sync.h" -int syncRaftConfigurationVoterCount(SSyncRaft *pRaft) { - return pRaft->cluster.replica; -} \ No newline at end of file +static int kSyncRaftMaxInflghtMsgs = 20; + +static SyncIndex kMaxCommitIndex = UINT64_MAX; + +#endif /* _TD_LIBS_SYNC_CONST_H */ diff --git a/source/libs/sync/inc/sync_raft_config_change.h b/source/libs/sync/inc/sync_raft_config_change.h index a54a7544fe2d77b85652a3ff71413caa9db8bc2b..75a29f35e8052858423e6bf3755f6a2ba0065ec6 100644 --- a/source/libs/sync/inc/sync_raft_config_change.h +++ b/source/libs/sync/inc/sync_raft_config_change.h @@ -33,6 +33,11 @@ struct SSyncRaftChanger { typedef int (*configChangeFp)(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); +// Simple carries out a series of configuration changes that (in aggregate) +// mutates the incoming majority config Voters[0] by at most one. This method +// will return an error if that is not the case, if the resulting quorum is +// zero, or if the configuration is in a joint state (i.e. if there is an +// outgoing configuration). int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); diff --git a/source/libs/sync/inc/sync_raft_impl.h b/source/libs/sync/inc/sync_raft_impl.h index bd77978c28cb42e03b47ea035e9a5cc4122733da..1a6c13f65f477b0eff38c31e454217c4c4e3321f 100644 --- a/source/libs/sync/inc/sync_raft_impl.h +++ b/source/libs/sync/inc/sync_raft_impl.h @@ -28,6 +28,8 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft); void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType); +void syncRaftCampaign(SSyncRaft* pRaft, ESyncRaftElectionType cType); + void syncRaftTriggerHeartbeat(SSyncRaft* pRaft); void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); @@ -51,4 +53,6 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState); void syncRaftBroadcastAppend(SSyncRaft* pRaft); +SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id); + #endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */ diff --git a/source/libs/sync/inc/sync_raft_inflights.h b/source/libs/sync/inc/sync_raft_inflights.h index 6d249c9274452d28604c335dce649833b225ea6e..627bf9a26f28eaecd752b7afbc7e4c3f7508bdb2 100644 --- a/source/libs/sync/inc/sync_raft_inflights.h +++ b/source/libs/sync/inc/sync_raft_inflights.h @@ -18,54 +18,47 @@ #include "sync.h" -/** - * SSyncRaftInflights limits the number of MsgApp (represented by the largest index - * contained within) sent to followers but not yet acknowledged by them. Callers - * use syncRaftInflightFull() to check whether more messages can be sent, - * call syncRaftInflightAdd() whenever they are sending a new append, - * and release "quota" via FreeLE() whenever an ack is received. -**/ +// Inflights limits the number of MsgApp (represented by the largest index +// contained within) sent to followers but not yet acknowledged by them. Callers +// use Full() to check whether more messages can be sent, call Add() whenever +// they are sending a new append, and release "quota" via FreeLE() whenever an +// ack is received. typedef struct SSyncRaftInflights { - /* the starting index in the buffer */ + // the starting index in the buffer int start; - /* number of inflights in the buffer */ + // number of inflights in the buffer int count; - /* the size of the buffer */ + // the size of the buffer int size; - /** - * buffer contains the index of the last entry - * inside one message. - **/ + // buffer contains the index of the last entry + // inside one message. SyncIndex* buffer; } SSyncRaftInflights; SSyncRaftInflights* syncRaftOpenInflights(int size); void syncRaftCloseInflights(SSyncRaftInflights*); +// reset frees all inflights. static FORCE_INLINE void syncRaftInflightReset(SSyncRaftInflights* inflights) { inflights->count = 0; inflights->start = 0; } +// Full returns true if no more messages can be sent at the moment. static FORCE_INLINE bool syncRaftInflightFull(SSyncRaftInflights* inflights) { return inflights->count == inflights->size; } -/** - * syncRaftInflightAdd notifies the Inflights that a new message with the given index is being - * dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() - * to verify that there is room for one more message, - * and consecutive calls to add syncRaftInflightAdd() must provide a - * monotonic sequence of indexes. - **/ +// Add notifies the Inflights that a new message with the given index is being +// dispatched. Full() must be called prior to Add() to verify that there is room +// for one more message, and consecutive calls to add Add() must provide a +// monotonic sequence of indexes. void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex); -/** - * syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight. - **/ +// FreeLE frees the inflights smaller or equal to the given `to` flight. void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex); /** diff --git a/source/libs/sync/inc/sync_raft_node_map.h b/source/libs/sync/inc/sync_raft_node_map.h new file mode 100644 index 0000000000000000000000000000000000000000..b4cf04056d2aaa76cc99b54ba903f8b0b86192ed --- /dev/null +++ b/source/libs/sync/inc/sync_raft_node_map.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_NODE_MAP_H +#define _TD_LIBS_SYNC_RAFT_NODE_MAP_H + +#include "thash.h" +#include "sync.h" +#include "sync_type.h" + +struct SSyncRaftNodeMap { + SHashObj* nodeIdMap; +}; + +void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap); +void syncRaftFreeNodeMap(SSyncRaftNodeMap* nodeMap); + +void syncRaftClearNodeMap(SSyncRaftNodeMap* nodeMap); + +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); + +void syncRaftCopyNodeMap(SSyncRaftNodeMap* from, SSyncRaftNodeMap* to); + +void syncRaftUnionNodeMap(SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to); + +void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); + +void syncRaftRemoveFromNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); + +int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap); + +// return true if reach the end +bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId); + +bool syncRaftIsAllNodeInProgressMap(SSyncRaftNodeMap* nodeMap, SSyncRaftProgressMap* progressMap); + +#endif /* _TD_LIBS_SYNC_RAFT_NODE_MAP_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index 173608a40a291bdf4772036f0c32b30880b7ec1e..32c21281cde4e1479c91b8ea5034aa3a81327970 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -18,6 +18,7 @@ #include "sync_type.h" #include "sync_raft_inflights.h" +#include "thash.h" /** * State defines how the leader should interact with the follower. @@ -64,141 +65,123 @@ static const char* kProgressStateString[] = { "Snapshot", }; -/** - * Progress represents a follower’s progress in the view of the leader. Leader maintains - * progresses of all followers, and sends entries to the follower based on its progress. - **/ +// Progress represents a follower’s progress in the view of the leader. Leader +// maintains progresses of all followers, and sends entries to the follower +// based on its progress. +// +// NB(tbg): Progress is basically a state machine whose transitions are mostly +// strewn around `*raft.raft`. Additionally, some fields are only used when in a +// certain State. All of this isn't ideal. struct SSyncRaftProgress { - // index in raft cluster config - int selfIndex; + SyncGroupId groupId; SyncNodeId id; + int16_t refCount; + SyncIndex nextIndex; SyncIndex matchIndex; - /** - * State defines how the leader should interact with the follower. - * - * When in StateProbe, leader sends at most one replication message - * per heartbeat interval. It also probes actual progress of the follower. - * - * When in StateReplicate, leader optimistically increases next - * to the latest entry sent after sending replication message. This is - * an optimized state for fast replicating log entries to the follower. - * - * When in StateSnapshot, leader should have sent out snapshot - * before and stops sending any replication message. - **/ + // State defines how the leader should interact with the follower. + // + // When in StateProbe, leader sends at most one replication message + // per heartbeat interval. It also probes actual progress of the follower. + // + // When in StateReplicate, leader optimistically increases next + // to the latest entry sent after sending replication message. This is + // an optimized state for fast replicating log entries to the follower. + // + // When in StateSnapshot, leader should have sent out snapshot + // before and stops sending any replication message. ESyncRaftProgressState state; - /** - * pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT. - * If there is a pending snapshot, the pendingSnapshotIndex will be set to the - * index of the snapshot. If pendingSnapshotIndex is set, the replication process of - * this Progress will be paused. raft will not resend snapshot until the pending one - * is reported to be failed. - **/ + // PendingSnapshot is used in StateSnapshot. + // If there is a pending snapshot, the pendingSnapshot will be set to the + // index of the snapshot. If pendingSnapshot is set, the replication process of + // this Progress will be paused. raft will not resend snapshot until the pending one + // is reported to be failed. SyncIndex pendingSnapshotIndex; - /** - * recentActive is true if the progress is recently active. Receiving any messages - * from the corresponding follower indicates the progress is active. - * RecentActive can be reset to false after an election timeout. - **/ + // RecentActive is true if the progress is recently active. Receiving any messages + // from the corresponding follower indicates the progress is active. + // RecentActive can be reset to false after an election timeout. + // + // TODO(tbg): the leader should always have this set to true. bool recentActive; - /** - * probeSent is used while this follower is in StateProbe. When probeSent is - * true, raft should pause sending replication message to this peer until - * probeSent is reset. See ProbeAcked() and IsPaused(). - **/ + // ProbeSent is used while this follower is in StateProbe. When ProbeSent is + // true, raft should pause sending replication message to this peer until + // ProbeSent is reset. See ProbeAcked() and IsPaused(). bool probeSent; - /** - * inflights is a sliding window for the inflight messages. - * Each inflight message contains one or more log entries. - * The max number of entries per message is defined in raft config as MaxSizePerMsg. - * Thus inflight effectively limits both the number of inflight messages - * and the bandwidth each Progress can use. - * When inflights is Full, no more message should be sent. - * When a leader sends out a message, the index of the last - * entry should be added to inflights. The index MUST be added - * into inflights in order. - * When a leader receives a reply, the previous inflights should - * be freed by calling inflights.FreeLE with the index of the last - * received entry. - **/ + // Inflights is a sliding window for the inflight messages. + // Each inflight message contains one or more log entries. + // The max number of entries per message is defined in raft config as MaxSizePerMsg. + // Thus inflight effectively limits both the number of inflight messages + // and the bandwidth each Progress can use. + // When inflights is Full, no more message should be sent. + // When a leader sends out a message, the index of the last + // entry should be added to inflights. The index MUST be added + // into inflights in order. + // When a leader receives a reply, the previous inflights should + // be freed by calling inflights.FreeLE with the index of the last + // received entry. SSyncRaftInflights* inflights; - /** - * IsLearner is true if this progress is tracked for a learner. - **/ + // IsLearner is true if this progress is tracked for a learner. bool isLearner; }; struct SSyncRaftProgressMap { - SSyncRaftProgress progress[TSDB_MAX_REPLICA]; + // map nodeId -> SSyncRaftProgress* + SHashObj* progressMap; }; - static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgress* progress) { return kProgressStateString[progress->state]; } -void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress); +void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress); -/** - * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, - * optionally and if larger, the index of the pending snapshot. - **/ +// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, +// optionally and if larger, the index of the pending snapshot. void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress); -/** - * syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1. - **/ +// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress); -/** - * syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the - * index acked by it. The method returns false if the given n index comes from - * an outdated message. Otherwise it updates the progress and returns true. - **/ +// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the +// index acked by it. The method returns false if the given n index comes from +// an outdated message. Otherwise it updates the progress and returns true. bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex); -/** - * syncRaftProgressOptimisticNextIndex signals that appends all the way up to and including index n - * are in-flight. As a result, Next is increased to n+1. - **/ +// OptimisticUpdate signals that appends all the way up to and including index n +// are in-flight. As a result, Next is increased to n+1. static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { progress->nextIndex = nextIndex + 1; } -/** - * syncRaftProgressMaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The - * arguments are the index of the append message rejected by the follower, and - * the hint that we want to decrease to. - * - * Rejections can happen spuriously as messages are sent out of order or - * duplicated. In such cases, the rejection pertains to an index that the - * Progress already knows were previously acknowledged, and false is returned - * without changing the Progress. - * - * If the rejection is genuine, Next is lowered sensibly, and the Progress is - * cleared for sending log entries. -**/ +// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The +// arguments are the index of the append message rejected by the follower, and +// the hint that we want to decrease to. +// +// Rejections can happen spuriously as messages are sent out of order or +// duplicated. In such cases, the rejection pertains to an index that the +// Progress already knows were previously acknowledged, and false is returned +// without changing the Progress. +// +// If the rejection is genuine, Next is lowered sensibly, and the Progress is +// cleared for sending log entries. bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, SyncIndex rejected, SyncIndex matchHint); -/** - * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. - * This is done when a node has rejected recent MsgApps, is currently waiting - * for a snapshot, or has reached the MaxInflightMsgs limit. In normal - * operation, this is false. A throttled node will be contacted less frequently - * until it has reached a state in which it's able to accept a steady stream of - * log entries again. - **/ +// IsPaused returns whether sending log entries to this node has been throttled. +// This is done when a node has rejected recent MsgApps, is currently waiting +// for a snapshot, or has reached the MaxInflightMsgs limit. In normal +// operation, this is false. A throttled node will be contacted less frequently +// until it has reached a state in which it's able to accept a steady stream of +// log entries again. bool syncRaftProgressIsPaused(SSyncRaftProgress* progress); static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) { @@ -221,22 +204,35 @@ static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progres return progress->recentActive; } -int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id); +void syncRaftInitProgressMap(SSyncRaftProgressMap* progressMap); +void syncRaftFreeProgressMap(SSyncRaftProgressMap* progressMap); + +void syncRaftClearProgressMap(SSyncRaftProgressMap* progressMap); +void syncRaftCopyProgressMap(SSyncRaftProgressMap* from, SSyncRaftProgressMap* to); + +SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id); -int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); +int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgress* progress); void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); +bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); + /** * return true if progress's log is up-todate **/ bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); +// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending +// snapshot index. void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to); -void syncRaftProgressMapCopy(const SSyncRaftProgressMap* from, SSyncRaftProgressMap* to); +// return true if reach the end +bool syncRaftIterateProgressMap(const SSyncRaftProgressMap* progressMap, SSyncRaftProgress *pProgress); + +bool syncRaftVisitProgressMap(SSyncRaftProgressMap* progressMap, visitProgressFp fp, void* arg); #if 0 diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index 8adb0b4736ca572d4e46288c33b0db890c3a1b7e..0a3c7dd6fc57ef0adcb22382910cc0e710cabb70 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -21,7 +21,9 @@ #include "sync_raft_quorum_joint.h" #include "sync_raft_progress.h" #include "sync_raft_proto.h" +#include "thash.h" +// Config reflects the configuration tracked in a ProgressTracker. struct SSyncRaftProgressTrackerConfig { SSyncRaftQuorumJointConfig voters; @@ -83,34 +85,47 @@ struct SSyncRaftProgressTracker { SSyncRaftProgressMap progressMap; - ESyncRaftVoteType votes[TSDB_MAX_REPLICA]; + // nodeid -> ESyncRaftVoteType map + SHashObj* votesMap; + int maxInflightMsgs; + + SSyncRaft* pRaft; }; -SSyncRaftProgressTracker* syncRaftOpenProgressTracker(); +SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft); + +void syncRaftInitTrackConfig(SSyncRaftProgressTrackerConfig* config); +void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config); +void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config); + +// ResetVotes prepares for a new round of vote counting via recordVote. void syncRaftResetVotes(SSyncRaftProgressTracker*); -typedef void (*visitProgressFp)(int i, SSyncRaftProgress* progress, void* arg); void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); -/** - * syncRaftRecordVote records that the node with the given id voted for this Raft - * instance if v == true (and declined it otherwise). - **/ -void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant); +// RecordVote records that the node with the given id voted for this Raft +// instance if v == true (and declined it otherwise). +void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant); -void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressTrackerConfig* result); +void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to); -int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); +int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); -/** - * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the - * election outcome is known. - **/ +// TallyVotes returns the number of granted and rejected Votes, and whether the +// election outcome is known. ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); -void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); +void syncRaftConfigState(SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); + +// Committed returns the largest log index known to be committed based on what +// the voting members of the group have acknowledged. +SyncIndex syncRaftCommittedIndex(SSyncRaftProgressTracker* tracker); + +// QuorumActive returns true if the quorum is active from the view of the local +// raft state machine. Otherwise, it returns false. +bool syncRaftQuorumActive(SSyncRaftProgressTracker* tracker); bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); diff --git a/source/libs/sync/inc/sync_raft_proto.h b/source/libs/sync/inc/sync_raft_proto.h index c131e91139d786c7b2fa186f3e7a52091860ba18..29371e328dc7365108075f3a42e7722bbadad03c 100644 --- a/source/libs/sync/inc/sync_raft_proto.h +++ b/source/libs/sync/inc/sync_raft_proto.h @@ -17,6 +17,7 @@ #define TD_SYNC_RAFT_PROTO_H #include "sync_type.h" +#include "sync_raft_node_map.h" typedef enum ESyncRaftConfChangeType { SYNC_RAFT_Conf_AddNode = 0, @@ -58,4 +59,19 @@ typedef struct SSyncConfigState { bool autoLeave; } SSyncConfigState; +static FORCE_INLINE bool syncRaftConfArrayIsEmpty(const SSyncConfChangeSingleArray* ary) { + return ary->n == 0; +} + +static FORCE_INLINE void syncRaftInitConfArray(SSyncConfChangeSingleArray* ary) { + *ary = (SSyncConfChangeSingleArray) { + .changes = NULL, + .n = 0, + }; +} + +static FORCE_INLINE void syncRaftFreeConfArray(SSyncConfChangeSingleArray* ary) { + if (ary->changes != NULL) free(ary->changes); +} + #endif /* TD_SYNC_RAFT_PROTO_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index 0ef002fe1a325ffb4ee7100dbac8aeba123bd8a9..9d5f10ab51c0b5fc3532899dccac1190e13b4101 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -19,24 +19,31 @@ #include "taosdef.h" #include "sync.h" #include "sync_type.h" +#include "sync_raft_node_map.h" +#include "thash.h" -/** - * SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping) - * majority configurations. Decisions require the support of both majorities. - **/ +// JointConfig is a configuration of two groups of (possibly overlapping) +// majority configurations. Decisions require the support of both majorities. typedef struct SSyncRaftQuorumJointConfig { SSyncRaftNodeMap outgoing; SSyncRaftNodeMap incoming; } SSyncRaftQuorumJointConfig; -/** - * syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns - * a result indicating whether the vote is pending, lost, or won. A joint quorum - * requires both majority quorums to vote in favor. - **/ -ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes); +// IDs returns a newly initialized map representing the set of voters present +// in the joint configuration. +void syncRaftJointConfigIDs(SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap); -bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); +// CommittedIndex returns the largest committed index for the given joint +// quorum. An index is jointly committed if it is committed in both constituent +// majorities. +SyncIndex syncRaftJointConfigCommittedIndex(const SSyncRaftQuorumJointConfig* config, matchAckIndexerFp indexer, void* arg); + +// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns +// a result indicating whether the vote is pending, lost, or won. A joint quorum +// requires both majority quorums to vote in favor. +ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap); + +void syncRaftInitQuorumJointConfig(SSyncRaftQuorumJointConfig* config); static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { return syncRaftIsInNodeMap(&config->outgoing, id); @@ -59,7 +66,19 @@ static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigOutgoing(const SS } static FORCE_INLINE void syncRaftJointConfigClearOutgoing(SSyncRaftQuorumJointConfig* config) { - memset(&config->outgoing, 0, sizeof(SSyncCluster)); + syncRaftClearNodeMap(&config->outgoing); +} + +static FORCE_INLINE bool syncRaftJointConfigIsIncomingEmpty(const SSyncRaftQuorumJointConfig* config) { + return syncRaftNodeMapSize(&config->incoming) == 0; +} + +static FORCE_INLINE bool syncRaftJointConfigIsOutgoingEmpty(const SSyncRaftQuorumJointConfig* config) { + return syncRaftNodeMapSize(&config->outgoing) == 0; +} + +static FORCE_INLINE bool syncRaftJointConfigIsInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { + return syncRaftIsInNodeMap(&config->outgoing, id); } #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_majority.h b/source/libs/sync/inc/sync_raft_quorum_majority.h index 0512a4dc87254b1054faf311fba04250d96eecf0..399bd71db8bd068b577d0890d1cab150ce524bc9 100644 --- a/source/libs/sync/inc/sync_raft_quorum_majority.h +++ b/source/libs/sync/inc/sync_raft_quorum_majority.h @@ -19,6 +19,7 @@ #include "sync.h" #include "sync_type.h" #include "sync_raft_quorum.h" +#include "thash.h" /** * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns @@ -26,6 +27,10 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes); +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap); + +// CommittedIndex computes the committed index from those supplied via the +// provided AckedIndexer (for the active config). +SyncIndex syncRaftMajorityConfigCommittedIndex(const SSyncRaftNodeMap* config, matchAckIndexerFp indexer, void* arg); #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ diff --git a/source/libs/sync/inc/sync_raft_restore.h b/source/libs/sync/inc/sync_raft_restore.h index 38eadb00c7167e274cced01acaa11e05f1146f6a..df4448cab8b4d9550541a5ea342d3464492be9ec 100644 --- a/source/libs/sync/inc/sync_raft_restore.h +++ b/source/libs/sync/inc/sync_raft_restore.h @@ -27,6 +27,7 @@ // the Changer only needs a ProgressMap (not a whole Tracker) at which point // this can just take LastIndex and MaxInflight directly instead and cook up // the results from that alone. -int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs); +int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs, + SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); #endif /* TD_SYNC_RAFT_RESTORE_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index cb938c73191ec0d879e4b889d6e88710dacc6626..9c4bc9e63cce11f63eb82d6864c389a4fcbc85a5 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -32,6 +32,8 @@ typedef struct SSyncRaftProgress SSyncRaftProgress; typedef struct SSyncRaftProgressMap SSyncRaftProgressMap; typedef struct SSyncRaftProgressTrackerConfig SSyncRaftProgressTrackerConfig; +typedef struct SSyncRaftNodeMap SSyncRaftNodeMap; + typedef struct SSyncRaftProgressTracker SSyncRaftProgressTracker; typedef struct SSyncRaftChanger SSyncRaftChanger; @@ -68,11 +70,6 @@ typedef struct SSyncClusterConfig { const SSyncCluster* cluster; } SSyncClusterConfig; -typedef struct { - int32_t replica; - SyncNodeId nodeId[TSDB_MAX_REPLICA]; -} SSyncRaftNodeMap; - typedef enum { SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, SYNC_RAFT_CAMPAIGN_ELECTION = 1, @@ -80,9 +77,6 @@ typedef enum { } ESyncRaftElectionType; typedef enum { - // the init vote resp status - SYNC_RAFT_VOTE_RESP_UNKNOWN = 0, - // grant the vote request SYNC_RAFT_VOTE_RESP_GRANT = 1, @@ -90,4 +84,8 @@ typedef enum { SYNC_RAFT_VOTE_RESP_REJECT = 2, } ESyncRaftVoteType; +typedef void (*visitProgressFp)(SSyncRaftProgress* progress, void* arg); + +typedef void (*matchAckIndexerFp)(SyncNodeId id, void* arg, SyncIndex* index); + #endif /* _TD_LIBS_SYNC_TYPE_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 23351277c4bd983831cdb942ed9c934458f311bc..72b0d268a8d9acec0c3b25650bc614256c235bd8 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -14,7 +14,7 @@ */ #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_log.h" #include "sync_raft_restore.h" #include "raft_replication.h" @@ -59,8 +59,13 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { logStore = &(pRaft->logStore); fsm = &(pRaft->fsm); + pRaft->nodeInfoMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pRaft->nodeInfoMap == NULL) { + return -1; + } + // init progress tracker - pRaft->tracker = syncRaftOpenProgressTracker(); + pRaft->tracker = syncRaftOpenProgressTracker(pRaft); if (pRaft->tracker == NULL) { return -1; } @@ -96,11 +101,22 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { .tracker = pRaft->tracker, .lastIndex = syncRaftLogLastIndex(pRaft->log), }; - if (syncRaftRestoreConfig(&changer, &confState) < 0) { + SSyncRaftProgressTrackerConfig config; + SSyncRaftProgressMap progressMap; + + if (syncRaftRestoreConfig(&changer, &confState, &config, &progressMap) < 0) { syncError("syncRaftRestoreConfig for vgid %d fail", pInfo->vgId); return -1; } + // save restored config and progress map to tracker + syncRaftCopyProgressMap(&progressMap, &pRaft->tracker->progressMap); + syncRaftCopyTrackerConfig(&config, &pRaft->tracker->config); + + // free progress map and config + syncRaftFreeProgressMap(&progressMap); + syncRaftFreeTrackConfig(&config); + if (!syncRaftIsEmptyServerState(&serverState)) { syncRaftLoadState(pRaft, &serverState); } @@ -140,6 +156,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { int32_t syncRaftTick(SSyncRaft* pRaft) { pRaft->currentTick += 1; + pRaft->tickFp(pRaft); return 0; } @@ -151,8 +168,8 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch return 0; } -static void visitProgressMaybeSendAppend(int i, SSyncRaftProgress* progress, void* arg) { - syncRaftReplicate(arg, progress, false); +static void visitProgressMaybeSendAppend(SSyncRaftProgress* progress, void* arg) { + syncRaftMaybeSendAppend(arg, progress, false); } // switchToConfig reconfigures this node to use the provided configuration. It @@ -169,13 +186,12 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi SSyncRaftProgress* progress = NULL; syncRaftConfigState(pRaft->tracker, cs); - i = syncRaftFindProgressIndexByNodeId(&pRaft->tracker->progressMap, selfId); - exist = (i != -1); + progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, selfId); + exist = (progress != NULL); // Update whether the node itself is a learner, resetting to false when the // node is removed. if (exist) { - progress = &pRaft->tracker->progressMap.progress[i]; pRaft->isLearner = progress->isLearner; } else { pRaft->isLearner = false; @@ -196,7 +212,7 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi // The remaining steps only make sense if this node is the leader and there // are other nodes. - if (pRaft->state != TAOS_SYNC_STATE_LEADER || cs->voters.replica == 0) { + if (pRaft->state != TAOS_SYNC_STATE_LEADER || syncRaftNodeMapSize(&cs->voters) == 0) { return; } @@ -212,8 +228,11 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi // If the the leadTransferee was removed or demoted, abort the leadership transfer. SyncNodeId leadTransferee = pRaft->leadTransferee; - if (leadTransferee != SYNC_NON_NODE_ID && !syncRaftIsInNodeMap(&pRaft->tracker->config.voters, leadTransferee)) { - abortLeaderTransfer(pRaft); + if (leadTransferee != SYNC_NON_NODE_ID) { + if (!syncRaftIsInNodeMap(&pRaft->tracker->config.voters.incoming, leadTransferee) && + !syncRaftIsInNodeMap(&pRaft->tracker->config.voters.outgoing, leadTransferee)) { + abortLeaderTransfer(pRaft); + } } } } @@ -286,8 +305,8 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) * but it will not receive MsgApp or MsgHeartbeat, so it will not create * disruptive term increases **/ - int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - if (peerIndex < 0) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { return true; } SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term); @@ -295,7 +314,7 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) return true; } - pRaft->io.send(msg, &(pRaft->cluster.nodeInfo[peerIndex])); + pRaft->io.send(msg, pNode); } else { // ignore other cases syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]", diff --git a/source/libs/sync/src/raft_handle_append_entries_message.c b/source/libs/sync/src/raft_handle_append_entries_message.c index 4797b6ce030b7ee5fdafc4142426604c2c4397db..92ebfe75f5ef3ebb26bb1fd5f00e850206db9936 100644 --- a/source/libs/sync/src/raft_handle_append_entries_message.c +++ b/source/libs/sync/src/raft_handle_append_entries_message.c @@ -16,15 +16,14 @@ #include "syncInt.h" #include "raft.h" #include "raft_log.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_message.h" int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { const RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries); - int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - - if (peerIndex < 0) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { return 0; } @@ -44,6 +43,6 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs pRaft->selfGroupId, pRaft->selfId, pMsg->from, appendEntries->index); out: - pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[peerIndex])); + pRaft->io.send(pRespMsg, pNode); return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_election_message.c b/source/libs/sync/src/raft_handle_election_message.c index e536fc67c0df21b0ff2e23f4083fe3c57981ee6e..a58c8ba5cfa1d663c1486a2c2ccbf7d7c9b28708 100644 --- a/source/libs/sync/src/raft_handle_election_message.c +++ b/source/libs/sync/src/raft_handle_election_message.c @@ -19,24 +19,6 @@ #include "raft_message.h" int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - if (pRaft->state == TAOS_SYNC_STATE_LEADER) { - syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId); - return 0; - } - - if (!syncRaftIsPromotable(pRaft)) { - syncDebug("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId); - return 0; - } - // if there is pending uncommitted config,cannot start election - if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) { - syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply", - pRaft->selfGroupId, pRaft->selfId, pRaft->term); - return 0; - } - - syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); - if (pRaft->preVote) { syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_PRE_ELECTION); } else { diff --git a/source/libs/sync/src/raft_handle_vote_message.c b/source/libs/sync/src/raft_handle_vote_message.c index 709e319c3ee1c3d04b33ddc8042fedc239fc9d2c..0219e39df9767d3dc1be0e0604bf1451202e028a 100644 --- a/source/libs/sync/src/raft_handle_vote_message.c +++ b/source/libs/sync/src/raft_handle_vote_message.c @@ -15,7 +15,7 @@ #include "syncInt.h" #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_log.h" #include "raft_message.h" @@ -23,10 +23,11 @@ static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { SSyncMessage* pRespMsg; - int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - if (voteIndex == -1) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { return 0; } + bool grant; SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); @@ -42,17 +43,19 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { grant ? "grant" : "reject", pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term); - pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[voteIndex])); + pRaft->io.send(pRespMsg, pNode); return 0; } static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - if (!(pRaft->voteFor == SYNC_NON_NODE_ID || pMsg->term > pRaft->term || pRaft->voteFor == pMsg->from)) { - return false; - } - if (!syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm)) { - return false; - } + bool canVote = + // We can vote if this is a repeat of a vote we've already cast... + pRaft->voteFor == pMsg->from || + // ...we haven't voted and we don't think there's a leader yet in this term... + (pRaft->voteFor == SYNC_NON_NODE_ID && pRaft->leaderId == SYNC_NON_NODE_ID) || + // ...or this is a PreVote for a future term... + (pMsg->vote.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION && pMsg->term > pRaft->term); - return true; + // ...and we believe the candidate is up to date. + return canVote && syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm); } \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c index 1781205ec0bdfa361ce63d57e23e1ed1a7bc13ee..87a5cfcd159166c74f844e9331085ca464fe83a3 100644 --- a/source/libs/sync/src/raft_handle_vote_resp_message.c +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -15,7 +15,7 @@ #include "syncInt.h" #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_message.h" int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { @@ -25,8 +25,8 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { assert(pRaft->state == TAOS_SYNC_STATE_CANDIDATE); - voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - if (voterIndex == -1) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from); return 0; } @@ -45,12 +45,14 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { if (result == SYNC_RAFT_VOTE_WON) { if (pRaft->candidateState.inPreVote) { - syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); + syncRaftCampaign(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); } else { syncRaftBecomeLeader(pRaft); - + syncRaftBroadcastAppend(pRaft); } } else if (result == SYNC_RAFT_VOTE_LOST) { + // pb.MsgPreVoteResp contains future term of pre-candidate + // m.Term > r.Term; reuse r.Term syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); } diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index c19fcd1e68022f34abf8293a4975400a9257c60b..c8c2d2c3792f575782fc89f14b8655d87f534834 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -22,14 +22,14 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress); static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, SyncIndex prevIndex, SyncTerm prevTerm, - const SSyncRaftEntry *entries, int nEntry); + SSyncRaftEntry *entries, int nEntry); -// syncRaftReplicate sends an append RPC with new entries to the given peer, +// maybeSendAppend sends an append RPC with new entries to the given peer, // if necessary. Returns true if a message was sent. The sendIfEmpty // argument controls whether messages with no entries will be sent // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). -bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) { +bool syncRaftMaybeSendAppend(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) { assert(pRaft->state == TAOS_SYNC_STATE_LEADER); SyncNodeId nodeId = progress->id; @@ -68,10 +68,13 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) { static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, SyncIndex prevIndex, SyncTerm prevTerm, - const SSyncRaftEntry *entries, int nEntry) { + SSyncRaftEntry *entries, int nEntry) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, progress->id); + if (pNode == NULL) { + return false; + } SyncIndex lastIndex; - SyncTerm logTerm = prevTerm; - SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[progress->selfIndex]); + SyncTerm logTerm = prevTerm; SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term, prevIndex, prevTerm, pRaft->log->commitIndex, @@ -87,7 +90,7 @@ static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, case PROGRESS_STATE_REPLICATE: lastIndex = entries[nEntry - 1].index; syncRaftProgressOptimisticNextIndex(progress, lastIndex); - syncRaftInflightAdd(&progress->inflights, lastIndex); + syncRaftInflightAdd(progress->inflights, lastIndex); break; case PROGRESS_STATE_PROBE: progress->probeSent = true; diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index 2f25828d5d80011e891287d5b1add1a48709b0a9..06af8ff6c2294750663e40bbf958980272216950 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -99,7 +99,7 @@ void syncCleanUp() { SSyncNode* syncStart(const SSyncInfo* pInfo) { pthread_mutex_lock(&gSyncManager->mutex); - SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId)); + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId*)); if (ppNode != NULL) { syncInfo("vgroup %d already exist", pInfo->vgId); pthread_mutex_unlock(&gSyncManager->mutex); @@ -140,7 +140,7 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) { void syncStop(const SSyncNode* pNode) { pthread_mutex_lock(&gSyncManager->mutex); - SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId)); + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId*)); if (ppNode == NULL) { syncInfo("vgroup %d not exist", pNode->vgId); pthread_mutex_unlock(&gSyncManager->mutex); @@ -288,7 +288,7 @@ static void *syncWorkerMain(void *argv) { static void syncNodeTick(void *param, void *tmrId) { SyncGroupId vgId = (SyncGroupId)param; - SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId)); + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId*)); if (ppNode == NULL) { return; } diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index 4e7f2190eaef22d2a6bbca5d835524894733bb56..de790b58767fb9e442a9bd5469e709e3e669cf02 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "raft.h" #include "syncInt.h" #include "sync_raft_config_change.h" #include "sync_raft_progress.h" @@ -40,40 +41,7 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id); static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, - SSyncRaftProgressMap* progressMap, SyncNodeId id); -// syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate) -// mutates the incoming majority config Voters[0] by at most one. This method -// will return an error if that is not the case, if the resulting quorum is -// zero, or if the configuration is in a joint state (i.e. if there is an -// outgoing configuration). -int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, - SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { - int ret; - - ret = checkAndCopy(changer, config, progressMap); - if (ret != 0) { - return ret; - } - - if (hasJointConfig(config)) { - syncError("can't apply simple config change in joint config"); - return -1; - } - - ret = applyConfig(changer, config, progressMap, css); - if (ret != 0) { - return ret; - } - - int n = symDiff(syncRaftJointConfigIncoming(&changer->tracker->config.voters), - syncRaftJointConfigIncoming(&config->voters)); - if (n > 1) { - syncError("more than one voter changed without entering joint config"); - return -1; - } - - return checkAndReturn(config, progressMap); -} + SSyncRaftProgressMap* progressMap, SyncNodeId id); // EnterJoint verifies that the outgoing (=right) majority config of the joint // config is empty and initializes it with a copy of the incoming (=left) @@ -96,12 +64,13 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S if (ret != 0) { return ret; } + if (hasJointConfig(config)) { syncError("config is already joint"); return -1; } - if(config->voters.incoming.replica == 0) { + if(syncRaftJointConfigIsIncomingEmpty(&config->voters) == 0) { // We allow adding nodes to an empty config for convenience (testing and // bootstrap), but you can't enter a joint state. syncError("can't make a zero-voter config joint"); @@ -112,7 +81,7 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S syncRaftJointConfigClearOutgoing(&config->voters); // Copy incoming to outgoing. - memcpy(&config->voters.outgoing, &config->voters.incoming, sizeof(SSyncCluster)); + syncRaftCopyNodeMap(&config->voters.incoming, &config->voters.outgoing); ret = applyConfig(changer, config, progressMap, css); if (ret != 0) { @@ -123,84 +92,43 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S return checkAndReturn(config, progressMap); } -// checkAndCopy copies the tracker's config and progress map (deeply enough for -// the purposes of the Changer) and returns those copies. It returns an error -// if checkInvariants does. -static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { - syncRaftCloneTrackerConfig(&changer->tracker->config, config); - int i; - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - SSyncRaftProgress* progress = &(changer->tracker->progressMap.progress[i]); - if (progress->id == SYNC_NON_NODE_ID) { - continue; - } - syncRaftCopyProgress(progress, &(progressMap->progress[i])); +// Simple carries out a series of configuration changes that (in aggregate) +// mutates the incoming majority config Voters[0] by at most one. This method +// will return an error if that is not the case, if the resulting quorum is +// zero, or if the configuration is in a joint state (i.e. if there is an +// outgoing configuration). +int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, + SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + int ret; + + ret = checkAndCopy(changer, config, progressMap); + if (ret != 0) { + return ret; } - return checkAndReturn(config, progressMap); -} -// checkAndReturn calls checkInvariants on the input and returns either the -// resulting error or the input. -static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { - if (checkInvariants(config, progressMap) != 0) { + if (hasJointConfig(config)) { + syncError("can't apply simple config change in joint config"); return -1; } - return 0; -} - -// checkInvariants makes sure that the config and progress are compatible with -// each other. This is used to check both what the Changer is initialized with, -// as well as what it returns. -static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { - int ret = syncRaftCheckProgress(config, progressMap); + ret = applyConfig(changer, config, progressMap, css); if (ret != 0) { return ret; } - int i; - // Any staged learner was staged because it could not be directly added due - // to a conflicting voter in the outgoing config. - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (!syncRaftJointConfigInOutgoing(&config->voters, config->learnersNext.nodeId[i])) { - return -1; - } - if (progressMap->progress[i].id != SYNC_NON_NODE_ID && progressMap->progress[i].isLearner) { - syncError("%d is in LearnersNext, but is already marked as learner", progressMap->progress[i].id); - return -1; - } - } - // Conversely Learners and Voters doesn't intersect at all. - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (syncRaftJointConfigInIncoming(&config->voters, config->learners.nodeId[i])) { - syncError("%d is in Learners and voter.incoming", progressMap->progress[i].id); - return -1; - } - if (progressMap->progress[i].id != SYNC_NON_NODE_ID && !progressMap->progress[i].isLearner) { - syncError("%d is in Learners, but is not marked as learner", progressMap->progress[i].id); - return -1; - } - } - - if (!hasJointConfig(config)) { - // We enforce that empty maps are nil instead of zero. - if (config->learnersNext.replica > 0) { - syncError("cfg.LearnersNext must be nil when not joint"); - return -1; - } - if (config->autoLeave) { - syncError("AutoLeave must be false when not joint"); - return -1; - } + int n = symDiff(syncRaftJointConfigIncoming(&changer->tracker->config.voters), + syncRaftJointConfigIncoming(&config->voters)); + if (n > 1) { + syncError("more than one voter changed without entering joint config"); + return -1; } - return 0; -} - -static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) { - return config->voters.outgoing.replica > 0; + return checkAndReturn(config, progressMap); } +// apply a change to the configuration. By convention, changes to voters are +// always made to the incoming majority config Voters[0]. Voters[1] is either +// empty or preserves the outgoing majority configuration while in a joint state. static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css) { int i; @@ -227,7 +155,7 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig } } - if (config->voters.incoming.replica == 0) { + if (syncRaftJointConfigIsIncomingEmpty(&config->voters)) { syncError("removed all voters"); return -1; } @@ -235,86 +163,16 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig return 0; } -// symdiff returns the count of the symmetric difference between the sets of -// uint64s, i.e. len( (l - r) \union (r - l)). -static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r) { - int n; - int i; - int j0, j1; - const SSyncRaftNodeMap* pairs[2][2] = { - {l, r}, // count elems in l but not in r - {r, l}, // count elems in r but not in l - }; - - for (n = 0, i = 0; i < 2; ++i) { - const SSyncRaftNodeMap** pp = pairs[i]; - - const SSyncRaftNodeMap* p0 = pp[0]; - const SSyncRaftNodeMap* p1 = pp[1]; - for (j0 = 0; j0 < TSDB_MAX_REPLICA; ++j0) { - SyncNodeId id = p0->nodeId[j0]; - if (id == SYNC_NON_NODE_ID) { - continue; - } - for (j1 = 0; j1 < p1->replica; ++j1) { - if (p1->nodeId[j1] != SYNC_NON_NODE_ID && p1->nodeId[j1] != id) { - n+=1; - } - } - } - } - - return n; -} - -static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, - SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner) { - -} - -// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after. -static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id) { - int i; - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (nodeMap->nodeId[i] == id) { - nodeMap->replica -= 1; - nodeMap->nodeId[i] = SYNC_NON_NODE_ID; - break; - } - } - - assert(nodeMap->replica >= 0); -} - -// nilAwareAdd populates a map entry, creating the map if necessary. -static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id) { - int i, j; - for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) { - if (nodeMap->nodeId[i] == id) { - return; - } - if (j == -1 && nodeMap->nodeId[i] == SYNC_NON_NODE_ID) { - j = i; - } - } - - assert(j != -1); - nodeMap->nodeId[j] = id; - nodeMap->replica += 1; -} // makeVoter adds or promotes the given ID to be a voter in the incoming // majority config. static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id) { - int i = syncRaftFindProgressIndexByNodeId(progressMap, id); - if (i == -1) { + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id); + if (progress == NULL) { initProgress(changer, config, progressMap, id, false); - i = syncRaftFindProgressIndexByNodeId(progressMap, id); + return; } - - assert(i != -1); - SSyncRaftProgress* progress = &(progressMap->progress[i]); progress->isLearner = false; nilAwareDelete(&config->learners, id); @@ -337,14 +195,12 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* // LeaveJoint(). static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id) { - int i = syncRaftFindProgressIndexByNodeId(progressMap, id); - if (i == -1) { - initProgress(changer, config, progressMap, id, false); - i = syncRaftFindProgressIndexByNodeId(progressMap, id); + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id); + if (progress == NULL) { + initProgress(changer, config, progressMap, id, true); + return; } - - assert(i != -1); - SSyncRaftProgress* progress = &(progressMap->progress[i]); + if (progress->isLearner) { return; } @@ -352,15 +208,15 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi removeNodeId(changer, config, progressMap, id); // ... but save the Progress. - syncRaftAddToProgressMap(progressMap, id); + syncRaftAddToProgressMap(progressMap, progress); // Use LearnersNext if we can't add the learner to Learners directly, i.e. // if the peer is still tracked as a voter in the outgoing config. It will // be turned into a learner in LeaveJoint(). // // Otherwise, add a regular learner right away. - bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); - if (inOutgoing) { + bool inInOutgoing = syncRaftJointConfigIsInOutgoing(&config->voters, id); + if (inInOutgoing) { nilAwareAdd(&config->learnersNext, id); } else { nilAwareAdd(&config->learners, id); @@ -371,8 +227,8 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi // removeNodeId this peer as a voter or learner from the incoming config. static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id) { - int i = syncRaftFindProgressIndexByNodeId(progressMap, id); - if (i == -1) { + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id); + if (progress == NULL) { return; } @@ -381,8 +237,173 @@ static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConf nilAwareDelete(&config->learnersNext, id); // If the peer is still a voter in the outgoing config, keep the Progress. - bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); - if (!inOutgoing) { + bool inInOutgoing = syncRaftJointConfigIsInOutgoing(&config->voters, id); + if (!inInOutgoing) { syncRaftRemoveFromProgressMap(progressMap, id); } +} + +// initProgress initializes a new progress for the given node or learner. +static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner) { + if (!isLearner) { + syncRaftJointConfigAddToIncoming(&config->voters, id); + } else { + nilAwareAdd(&config->learners, id); + } + + SSyncRaftProgress* pProgress = (SSyncRaftProgress*)malloc(sizeof(SSyncRaftProgress)); + assert (pProgress != NULL); + *pProgress = (SSyncRaftProgress) { + // Initializing the Progress with the last index means that the follower + // can be probed (with the last index). + // + // TODO(tbg): seems awfully optimistic. Using the first index would be + // better. The general expectation here is that the follower has no log + // at all (and will thus likely need a snapshot), though the app may + // have applied a snapshot out of band before adding the replica (thus + // making the first index the better choice). + .id = id, + .groupId = changer->tracker->pRaft->selfGroupId, + .nextIndex = changer->lastIndex, + .matchIndex = 0, + .state = PROGRESS_STATE_PROBE, + .pendingSnapshotIndex = 0, + .probeSent = false, + .inflights = syncRaftOpenInflights(changer->tracker->maxInflightMsgs), + .isLearner = isLearner, + // When a node is first added, we should mark it as recently active. + // Otherwise, CheckQuorum may cause us to step down if it is invoked + // before the added node has had a chance to communicate with us. + .recentActive = true, + .refCount = 0, + }; + + syncRaftAddToProgressMap(progressMap, pProgress); +} + +// checkInvariants makes sure that the config and progress are compatible with +// each other. This is used to check both what the Changer is initialized with, +// as well as what it returns. +static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + int ret = syncRaftCheckTrackerConfigInProgress(config, progressMap); + if (ret != 0) { + return ret; + } + + // Any staged learner was staged because it could not be directly added due + // to a conflicting voter in the outgoing config. + SyncNodeId* pNodeId = NULL; + while (!syncRaftIterateNodeMap(&config->learnersNext, pNodeId)) { + SyncNodeId nodeId = *pNodeId; + if (!syncRaftJointConfigInOutgoing(&config->voters, nodeId)) { + syncError("[%d] is in LearnersNext, but not outgoing", nodeId); + return -1; + } + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, nodeId); + assert(progress); + assert(progress->id == nodeId); + if (progress->isLearner) { + syncError("[%d:%d] is in LearnersNext, but is already marked as learner", progress->groupId, nodeId); + return -1; + } + } + + // Conversely Learners and Voters doesn't intersect at all. + pNodeId = NULL; + while (!syncRaftIterateNodeMap(&config->learners, pNodeId)) { + SyncNodeId nodeId = *pNodeId; + if (syncRaftJointConfigInOutgoing(&config->voters, nodeId)) { + syncError("%d is in Learners and outgoing", nodeId); + return -1; + } + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, nodeId); + assert(progress); + assert(progress->id == nodeId); + + if (!progress->isLearner) { + syncError("[%d:%d] is in Learners, but is not marked as learner", progress->groupId, nodeId); + return -1; + } + } + + if (!hasJointConfig(config)) { + // We enforce that empty maps are nil instead of zero. + if (syncRaftNodeMapSize(&config->learnersNext) > 0) { + syncError("cfg.LearnersNext must be nil when not joint"); + return -1; + } + if (config->autoLeave) { + syncError("AutoLeave must be false when not joint"); + return -1; + } + } + + return 0; +} + +// checkAndCopy copies the tracker's config and progress map (deeply enough for +// the purposes of the Changer) and returns those copies. It returns an error +// if checkInvariants does. +static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + syncRaftCopyTrackerConfig(&changer->tracker->config, config); + syncRaftClearProgressMap(progressMap); + + SSyncRaftProgress* pProgress = NULL; + while (!syncRaftIterateProgressMap(&changer->tracker->progressMap, pProgress)) { + syncRaftAddToProgressMap(progressMap, pProgress); + } + + return checkAndReturn(config, progressMap); +} + +// checkAndReturn calls checkInvariants on the input and returns either the +// resulting error or the input. +static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + if (checkInvariants(config, progressMap) != 0) { + return -1; + } + + return 0; +} + +static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) { + return !syncRaftJointConfigIsOutgoingEmpty(&config->voters); +} + +// symdiff returns the count of the symmetric difference between the sets of +// uint64s, i.e. len( (l - r) \union (r - l)). +static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r) { + int n; + int i; + int j0, j1; + const SSyncRaftNodeMap* pairs[2][2] = { + {l, r}, // count elems in l but not in r + {r, l}, // count elems in r but not in l + }; + + for (n = 0, i = 0; i < 2; ++i) { + const SSyncRaftNodeMap** pp = pairs[i]; + + const SSyncRaftNodeMap* p0 = pp[0]; + const SSyncRaftNodeMap* p1 = pp[1]; + SyncNodeId* pNodeId; + while (!syncRaftIterateNodeMap(p0, pNodeId)) { + if (!syncRaftIsInNodeMap(p1, *pNodeId)) { + n+=1; + } + } + } + + return n; +} + +// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after. +static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id) { + syncRaftRemoveFromNodeMap(nodeMap, id); +} + +// nilAwareAdd populates a map entry, creating the map if necessary. +static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id) { + syncRaftAddToNodeMap(nodeMap, id); } \ No newline at end of file diff --git a/source/libs/sync/src/raft_election.c b/source/libs/sync/src/sync_raft_election.c similarity index 51% rename from source/libs/sync/src/raft_election.c rename to source/libs/sync/src/sync_raft_election.c index eb310c31eceeca07e9736a7d175723bf07661c23..fe2e0fd9d318f8b4788012bac6a2071174c1d1a3 100644 --- a/source/libs/sync/src/raft_election.c +++ b/source/libs/sync/src/sync_raft_election.c @@ -17,15 +17,40 @@ #include "raft.h" #include "raft_log.h" #include "raft_message.h" +#include "sync_raft_progress_tracker.h" void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { - SyncTerm term; + if (pRaft->state == TAOS_SYNC_STATE_LEADER) { + syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId); + return; + } + + if (!syncRaftIsPromotable(pRaft)) { + syncWarn("[%d:%d] is unpromotable and can not syncRaftCampaign", pRaft->selfGroupId, pRaft->selfId); + return; + } + + // if there is pending uncommitted config,cannot start election + if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) { + syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply", + pRaft->selfGroupId, pRaft->selfId, pRaft->term); + return; + } + + syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + + syncRaftCampaign(pRaft, cType); +} + +// syncRaftCampaign transitions the raft instance to candidate state. This must only be +// called after verifying that this is a legitimate transition. +void syncRaftCampaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { bool preVote; - ESyncRaftMessageType voteMsgType; + SyncTerm term; if (syncRaftIsPromotable(pRaft)) { - syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId); - return 0; + syncDebug("[%d:%d] is unpromotable; syncRaftCampaign() should have been called", pRaft->selfGroupId, pRaft->selfId); + return; } if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { @@ -35,7 +60,6 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { term = pRaft->term + 1; } else { syncRaftBecomeCandidate(pRaft); - voteMsgType = RAFT_MSG_VOTE; term = pRaft->term; preVote = false; } @@ -43,10 +67,8 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { int quorum = syncRaftQuorum(pRaft); ESyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL); if (result == SYNC_RAFT_VOTE_WON) { - /** - * We won the election after voting for ourselves (which must mean that - * this is a single-node cluster). Advance to the next state. - **/ + // We won the election after voting for ourselves (which must mean that + // this is a single-node cluster). Advance to the next state. if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); } else { @@ -59,12 +81,23 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { int i; SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); - for (i = 0; i < pRaft->cluster.replica; ++i) { - if (i == pRaft->cluster.selfIndex) { + SSyncRaftNodeMap nodeMap; + syncRaftJointConfigIDs(&pRaft->tracker->config.voters, &nodeMap); + SyncNodeId *pNodeId = NULL; + while (!syncRaftIterateNodeMap(&nodeMap, pNodeId)) { + SyncNodeId nodeId = *pNodeId; + if (nodeId == SYNC_NON_NODE_ID) { continue; } - SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; + if (nodeId == pRaft->selfId) { + continue; + } + + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, nodeId); + if (pNode == NULL) { + continue; + } SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId, term, cType, lastIndex, lastTerm); @@ -72,10 +105,10 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { continue; } - syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent %d request to %d at term %" PRId64 "", + syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent vote request to %d at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, lastTerm, - lastIndex, voteMsgType, nodeId, pRaft->term); + lastIndex, nodeId, pRaft->term); - pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); + pRaft->io.send(pMsg, pNode); } } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index d65e03c64f08416d467e829cd5faf2b7dc36e179..3050bb2c8aa9186df4c9f16db2c8a5a883476533 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -14,7 +14,7 @@ */ #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_log.h" #include "raft_replication.h" #include "sync_raft_progress_tracker.h" @@ -25,6 +25,8 @@ static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static bool increaseUncommittedSize(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n); + static int triggerAll(SSyncRaft* pRaft); static void tickElection(SSyncRaft* pRaft); @@ -82,13 +84,22 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) { resetRaft(pRaft, pRaft->term); pRaft->leaderId = pRaft->leaderId; pRaft->state = TAOS_SYNC_STATE_LEADER; - // TODO: check if there is pending config log - int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log); - if (nPendingConf > 1) { - syncFatal("unexpected multiple uncommitted config entry"); - } - syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId); + assert(progress != NULL); + // Followers enter replicate mode when they've been successfully probed + // (perhaps after having received a snapshot as a result). The leader is + // trivially in this state. Note that r.reset() has initialized this + // progress with the last index already. + syncRaftProgressBecomeReplicate(progress); + + // Conservatively set the pendingConfIndex to the last index in the + // log. There may or may not be a pending config change, but it's + // safe to delay any future proposals until we commit all our + // pending log entries, and scanning the entire tail of the log + // could be expensive. + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + pRaft->pendingConfigIndex = lastIndex; // after become leader, send a no-op log SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry)); @@ -103,6 +114,7 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) { }; appendEntries(pRaft, entry, 1); //syncRaftTriggerHeartbeat(pRaft); + syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); } void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) { @@ -123,15 +135,16 @@ bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { } int syncRaftQuorum(SSyncRaft* pRaft) { - return pRaft->cluster.replica / 2 + 1; + return 0; + //return pRaft->cluster.replica / 2 + 1; } ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool grant, int* rejected, int *granted) { - int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id); - if (voterIndex == -1) { - return SYNC_RAFT_VOTE_PENDING; + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, id); + if (pNode == NULL) { + return true; } if (grant) { @@ -142,7 +155,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); } - syncRaftRecordVote(pRaft->tracker, voterIndex, grant); + syncRaftRecordVote(pRaft->tracker, pNode->nodeId, grant); return syncRaftTallyVotes(pRaft->tracker, rejected, granted); } /* @@ -154,7 +167,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, pRaft->selfGroupId, pRaft->selfId, id, pRaft->term); } - int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id); + int voteIndex = syncRaftGetNodeById(pRaft, id); assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0); assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN); @@ -185,19 +198,30 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) { pRaft->voteFor = serverState->voteFor; } -static void visitProgressSendAppend(int i, SSyncRaftProgress* progress, void* arg) { +static void visitProgressSendAppend(SSyncRaftProgress* progress, void* arg) { SSyncRaft* pRaft = (SSyncRaft*)arg; if (pRaft->selfId == progress->id) { return; } - syncRaftReplicate(arg, progress, true); + syncRaftMaybeSendAppend(arg, progress, true); } +// bcastAppend sends RPC, with entries to all peers that are not up-to-date +// according to the progress recorded in r.prs. void syncRaftBroadcastAppend(SSyncRaft* pRaft) { syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft); } +SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id) { + SNodeInfo **ppNode = taosHashGet(pRaft->nodeInfoMap, &id, sizeof(SyncNodeId*)); + if (ppNode != NULL) { + return *ppNode; + } + + return NULL; +} + static int convertClear(SSyncRaft* pRaft) { } @@ -223,7 +247,7 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { syncRaftHandleVoteRespMessage(pRaft, pMsg); return 0; } else if (msgType == RAFT_MSG_APPEND) { - syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from); + syncRaftBecomeFollower(pRaft, pMsg->term, pMsg->from); syncRaftHandleAppendEntriesMessage(pRaft, pMsg); } return 0; @@ -234,9 +258,7 @@ static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) { return 0; } -/** - * tickElection is run by followers and candidates per tick. - **/ +// tickElection is run by followers and candidates after r.electionTimeout. static void tickElection(SSyncRaft* pRaft) { pRaft->electionElapsed += 1; @@ -254,10 +276,16 @@ static void tickElection(SSyncRaft* pRaft) { syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId)); } +// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout. static void tickHeartbeat(SSyncRaft* pRaft) { } +// TODO +static bool increaseUncommittedSize(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { + return false; +} + static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncTerm term = pRaft->term; @@ -268,9 +296,16 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { entries[i].index = lastIndex + 1 + i; } + // Track the size of this uncommitted proposal. + if (!increaseUncommittedSize(pRaft, entries, n)) { + // Drop the proposal. + return; + } + syncRaftLogAppend(pRaft->log, entries, n); - SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->cluster.selfIndex]); + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId); + assert(progress != NULL); syncRaftProgressMaybeUpdate(progress, lastIndex); // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend. syncRaftMaybeCommit(pRaft); @@ -297,7 +332,7 @@ static int triggerAll(SSyncRaft* pRaft) { continue; } - syncRaftReplicate(pRaft, pRaft->tracker->progressMap.progress[i], true); + syncRaftMaybeSendAppend(pRaft, pRaft->tracker->progressMap.progress[i], true); } #endif return 0; @@ -307,8 +342,8 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) { pRaft->leadTransferee = SYNC_NON_NODE_ID; } -static void initProgress(int i, SSyncRaftProgress* progress, void* arg) { - syncRaftInitProgress(i, (SSyncRaft*)arg, progress); +static void resetProgress(SSyncRaftProgress* progress, void* arg) { + syncRaftResetProgress((SSyncRaft*)arg, progress); } static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { @@ -327,7 +362,7 @@ static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { abortLeaderTransfer(pRaft); syncRaftResetVotes(pRaft->tracker); - syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft); + syncRaftProgressVisit(pRaft->tracker, resetProgress, pRaft); pRaft->pendingConfigIndex = 0; pRaft->uncommittedSize = 0; diff --git a/source/libs/sync/src/sync_raft_inflights.c b/source/libs/sync/src/sync_raft_inflights.c index 3d740b5a9e18f8b8461c9db15c3f635da50b55a9..7b97aca014c41847492ab7507ddb3ce5c92fef8b 100644 --- a/source/libs/sync/src/sync_raft_inflights.c +++ b/source/libs/sync/src/sync_raft_inflights.c @@ -40,19 +40,16 @@ void syncRaftCloseInflights(SSyncRaftInflights* inflights) { free(inflights); } -/** - * syncRaftInflightAdd notifies the Inflights that a new message with the given index is being - * dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() - * to verify that there is room for one more message, - * and consecutive calls to add syncRaftInflightAdd() must provide a - * monotonic sequence of indexes. - **/ +// Add notifies the Inflights that a new message with the given index is being +// dispatched. Full() must be called prior to Add() to verify that there is room +// for one more message, and consecutive calls to add Add() must provide a +// monotonic sequence of indexes. void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) { assert(!syncRaftInflightFull(inflights)); int next = inflights->start + inflights->count; int size = inflights->size; - /* is next wrapped around buffer? */ + if (next >= size) { next -= size; } @@ -61,12 +58,10 @@ void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) inflights->count++; } -/** - * syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight. - **/ +// FreeLE frees the inflights smaller or equal to the given `to` flight. void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) { if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) { - /* out of the left side of the window */ + // out of the left side of the window return; } @@ -95,10 +90,8 @@ void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) { } } -/** - * syncRaftInflightFreeFirstOne releases the first inflight. - * This is a no-op if nothing is inflight. - **/ +// FreeFirstOne releases the first inflight. This is a no-op if nothing is +// inflight. void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) { syncRaftInflightFreeLE(inflights, inflights->buffer[inflights->start]); } diff --git a/source/libs/sync/src/sync_raft_node_map.c b/source/libs/sync/src/sync_raft_node_map.c new file mode 100644 index 0000000000000000000000000000000000000000..642eebe65bbc858bf2ffc0e1ceb246d50fd728f7 --- /dev/null +++ b/source/libs/sync/src/sync_raft_node_map.c @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_node_map.h" +#include "sync_type.h" +#include "sync_raft_progress.h" + +void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap) { + nodeMap->nodeIdMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); +} + +void syncRaftFreeNodeMap(SSyncRaftNodeMap* nodeMap) { + taosHashCleanup(nodeMap->nodeIdMap); +} + +void syncRaftClearNodeMap(SSyncRaftNodeMap* nodeMap) { + taosHashClear(nodeMap->nodeIdMap); +} + +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { + SyncNodeId** ppId = (SyncNodeId**)taosHashGet(nodeMap->nodeIdMap, &nodeId, sizeof(SyncNodeId*)); + if (ppId == NULL) { + return false; + } + return true; +} + +void syncRaftCopyNodeMap(SSyncRaftNodeMap* from, SSyncRaftNodeMap* to) { + SyncNodeId *pId = NULL; + while (!syncRaftIterateNodeMap(from, pId)) { + taosHashPut(to->nodeIdMap, &pId, sizeof(SyncNodeId*), &pId, sizeof(SyncNodeId*)); + } +} + +bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId) { + SyncNodeId **ppId = taosHashIterate(nodeMap->nodeIdMap, pId); + if (ppId == NULL) { + return true; + } + + *pId = *(*ppId); + return false; +} + +bool syncRaftIsAllNodeInProgressMap(SSyncRaftNodeMap* nodeMap, SSyncRaftProgressMap* progressMap) { + SyncNodeId *pId = NULL; + while (!syncRaftIterateNodeMap(nodeMap, pId)) { + if (!syncRaftIsInProgressMap(progressMap, *pId)) { + return false; + } + } + + return true; +} + +void syncRaftUnionNodeMap(SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) { + syncRaftCopyNodeMap(nodeMap, to); +} + +void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { + taosHashPut(nodeMap->nodeIdMap, &nodeId, sizeof(SyncNodeId*), &nodeId, sizeof(SyncNodeId*)); +} + +void syncRaftRemoveFromNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { + taosHashRemove(nodeMap->nodeIdMap, &nodeId, sizeof(SyncNodeId*)); +} + +int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap) { + return taosHashGetSize(nodeMap->nodeIdMap); +} \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress.c b/source/libs/sync/src/sync_raft_progress.c index a53aae93d0115df4dcfbc534da2396d1b95a6245..a3ab93c0fcb5d5d409a3251d3906d890709ef53f 100644 --- a/source/libs/sync/src/sync_raft_progress.c +++ b/source/libs/sync/src/sync_raft_progress.c @@ -20,18 +20,26 @@ #include "sync.h" #include "syncInt.h" +static void copyProgress(SSyncRaftProgress* progress, void* arg); + +static void refProgress(SSyncRaftProgress* progress); +static void unrefProgress(SSyncRaftProgress* progress, void*); + static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state); static void probeAcked(SSyncRaftProgress* progress); static void resumeProgress(SSyncRaftProgress* progress); -void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) { +void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress) { + if (progress->inflights) { + syncRaftCloseInflights(progress->inflights); + } SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflightMsgs); if (inflights == NULL) { return; } *progress = (SSyncRaftProgress) { - .matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0, + .matchIndex = progress->id == pRaft->selfId ? syncRaftLogLastIndex(pRaft->log) : 0, .nextIndex = syncRaftLogLastIndex(pRaft->log) + 1, .inflights = inflights, .isLearner = false, @@ -39,11 +47,9 @@ void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) }; } -/** - * syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the - * index acked by it. The method returns false if the given n index comes from - * an outdated message. Otherwise it updates the progress and returns true. - **/ +// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the +// index acked by it. The method returns false if the given n index comes from +// an outdated message. Otherwise it updates the progress and returns true. bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) { bool updated = false; @@ -58,27 +64,36 @@ bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastInde return updated; } +// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The +// arguments are the index of the append message rejected by the follower, and +// the hint that we want to decrease to. +// +// Rejections can happen spuriously as messages are sent out of order or +// duplicated. In such cases, the rejection pertains to an index that the +// Progress already knows were previously acknowledged, and false is returned +// without changing the Progress. +// +// If the rejection is genuine, Next is lowered sensibly, and the Progress is +// cleared for sending log entries. bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, SyncIndex rejected, SyncIndex matchHint) { if (progress->state == PROGRESS_STATE_REPLICATE) { - /** - * the rejection must be stale if the progress has matched and "rejected" - * is smaller than "match". - **/ + // The rejection must be stale if the progress has matched and "rejected" + // is smaller than "match". if (rejected <= progress->matchIndex) { syncDebug("match index is up to date,ignore"); return false; } - /* directly decrease next to match + 1 */ + // Directly decrease next to match + 1. + // + // TODO(tbg): why not use matchHint if it's larger? progress->nextIndex = progress->matchIndex + 1; return true; } - /** - * The rejection must be stale if "rejected" does not match next - 1. This - * is because non-replicating followers are probed one entry at a time. - **/ + // The rejection must be stale if "rejected" does not match next - 1. This + // is because non-replicating followers are probed one entry at a time. if (rejected != progress->nextIndex - 1) { syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore" , rejected, progress->nextIndex); @@ -91,14 +106,12 @@ bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, return true; } -/** - * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. - * This is done when a node has rejected recent MsgApps, is currently waiting - * for a snapshot, or has reached the MaxInflightMsgs limit. In normal - * operation, this is false. A throttled node will be contacted less frequently - * until it has reached a state in which it's able to accept a steady stream of - * log entries again. - **/ +// IsPaused returns whether sending log entries to this node has been throttled. +// This is done when a node has rejected recent MsgApps, is currently waiting +// for a snapshot, or has reached the MaxInflightMsgs limit. In normal +// operation, this is false. A throttled node will be contacted less frequently +// until it has reached a state in which it's able to accept a steady stream of +// log entries again. bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { switch (progress->state) { case PROGRESS_STATE_PROBE: @@ -112,58 +125,44 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { } } -int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) { - int i; - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (progressMap->progress[i].id == id) { - return i; - } +SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) { + SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*)); + if (ppProgress == NULL) { + return NULL; } - return -1; + + return *ppProgress; } -int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { - int i, j; +int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgress* progress) { + refProgress(progress); + taosHashPut(progressMap->progressMap, &progress->id, sizeof(SyncNodeId*), &progress, sizeof(SSyncRaftProgress*)); +} - for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) { - if (progressMap->progress[i].id == id) { - return i; - } - if (j == -1 && progressMap->progress[i].id == SYNC_NON_NODE_ID) { - j = i; - } +void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { + SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*)); + if (ppProgress == NULL) { + return; } + unrefProgress(*ppProgress, NULL); - assert(j != -1); - - progressMap->progress[i].id = id; + taosHashRemove(progressMap->progressMap, &id, sizeof(SyncNodeId*)); } -void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { - int i; - - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (progressMap->progress[i].id == id) { - progressMap->progress[i].id = SYNC_NON_NODE_ID; - break; - } - } +bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { + return taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*)) != NULL; } bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; } -/** - * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, - * optionally and if larger, the index of the pending snapshot. - **/ +// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, +// optionally and if larger, the index of the pending snapshot. void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { - /** - * If the original state is ProgressStateSnapshot, progress knows that - * the pending snapshot has been sent to this peer successfully, then - * probes from pendingSnapshot + 1. - **/ + // If the original state is StateSnapshot, progress knows that + // the pending snapshot has been sent to this peer successfully, then + // probes from pendingSnapshot + 1. if (progress->state == PROGRESS_STATE_SNAPSHOT) { SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; resetProgressState(progress, PROGRESS_STATE_PROBE); @@ -174,111 +173,88 @@ void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { } } -/** - * syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1. - **/ +// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) { resetProgressState(progress, PROGRESS_STATE_REPLICATE); progress->nextIndex = progress->matchIndex + 1; } +// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending +// snapshot index. void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) { resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); progress->pendingSnapshotIndex = snapshotIndex; } void syncRaftCopyProgress(const SSyncRaftProgress* progress, SSyncRaftProgress* out) { - -} - -/** - * ResetState moves the Progress into the specified State, resetting ProbeSent, - * PendingSnapshot, and Inflights. - **/ -static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state) { - progress->probeSent = false; - progress->pendingSnapshotIndex = 0; - progress->state = state; - syncRaftInflightReset(progress->inflights); + memcpy(out, progress, sizeof(SSyncRaftProgress)); } -/** - * probeAcked is called when this peer has accepted an append. It resets - * ProbeSent to signal that additional append messages should be sent without - * further delay. - **/ -static void probeAcked(SSyncRaftProgress* progress) { - progress->probeSent = false; +void syncRaftInitProgressMap(SSyncRaftProgressMap* progressMap) { + progressMap->progressMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); } -#if 0 - -SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i) { - return pRaft->leaderState.progress[i].nextIndex; +void syncRaftFreeProgressMap(SSyncRaftProgressMap* progressMap) { + syncRaftVisitProgressMap(progressMap, unrefProgress, NULL); + taosHashCleanup(progressMap->progressMap); } -SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i) { - return pRaft->leaderState.progress[i].matchIndex; +void syncRaftClearProgressMap(SSyncRaftProgressMap* progressMap) { + taosHashClear(progressMap->progressMap); } -void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i) { - pRaft->leaderState.progress[i].lastSend = pRaft->io.time(pRaft); +void syncRaftCopyProgressMap(SSyncRaftProgressMap* from, SSyncRaftProgressMap* to) { + syncRaftVisitProgressMap(from, copyProgress, to); } -void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i) { - pRaft->leaderState.progress[i].lastSendSnapshot = pRaft->io.time(pRaft); -} +bool syncRaftIterateProgressMap(const SSyncRaftProgressMap* progressMap, SSyncRaftProgress *pProgress) { + SSyncRaftProgress **ppProgress = taosHashIterate(progressMap->progressMap, pProgress); + if (ppProgress == NULL) { + return true; + } -bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i) { - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - bool prev = progress->recentRecv; - progress->recentRecv = false; - return prev; + *pProgress = *(*ppProgress); + return false; } -void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i) { - pRaft->leaderState.progress[i].recentRecv = true; +bool syncRaftVisitProgressMap(SSyncRaftProgressMap* progressMap, visitProgressFp fp, void* arg) { + SSyncRaftProgress *pProgress; + while (!syncRaftIterateProgressMap(progressMap, pProgress)) { + fp(pProgress, arg); + } } -bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) { - return pRaft->leaderState.progress[i].recentRecv; +static void copyProgress(SSyncRaftProgress* progress, void* arg) { + assert(progress->refCount > 0); + SSyncRaftProgressMap* to = (SSyncRaftProgressMap*)arg; + syncRaftAddToProgressMap(to, progress); } -void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) { - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); - progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log); +static void refProgress(SSyncRaftProgress* progress) { + progress->refCount += 1; } -void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - - if (progress->state == PROGRESS_STATE_SNAPSHOT) { - assert(progress->pendingSnapshotIndex > 0); - SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; - resetProgressState(progress, PROGRESS_STATE_PROBE); - progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex); - } else { - resetProgressState(progress, PROGRESS_STATE_PROBE); - progress->nextIndex = progress->matchIndex + 1; +static void unrefProgress(SSyncRaftProgress* progress, void* arg) { + (void)arg; + progress->refCount -= 1; + assert(progress->refCount >= 0); + if (progress->refCount == 0) { + free(progress); } } - -void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) { - resetProgressState(pRaft->leaderState.progress, PROGRESS_STATE_REPLICATE); - pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1; -} - -void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) { - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + +// ResetState moves the Progress into the specified State, resetting ProbeSent, +// PendingSnapshot, and Inflights. +static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state) { + progress->probeSent = false; progress->pendingSnapshotIndex = 0; - progress->state = PROGRESS_STATE_PROBE; + progress->state = state; + syncRaftInflightReset(progress->inflights); } -ESyncRaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) { - return pRaft->leaderState.progress[i].state; +// ProbeAcked is called when this peer has accepted an append. It resets +// ProbeSent to signal that additional append messages should be sent without +// further delay. +static void probeAcked(SSyncRaftProgress* progress) { + progress->probeSent = false; } - - - -#endif \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index ea7f1ae4f5fed568a466ec3504e864b8377fcd2d..e0b4afae21221f50a2f17de3ef66c2e747a49707 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -13,62 +13,99 @@ * along with this program. If not, see . */ +#include "raft.h" +#include "sync_const.h" #include "sync_raft_progress_tracker.h" #include "sync_raft_proto.h" -SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { +SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft) { SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker)); if (tracker == NULL) { return NULL; } + tracker->votesMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + + syncRaftInitTrackConfig(&tracker->config); + tracker->pRaft = pRaft; + tracker->maxInflightMsgs = kSyncRaftMaxInflghtMsgs; + return tracker; } +void syncRaftInitTrackConfig(SSyncRaftProgressTrackerConfig* config) { + syncRaftInitNodeMap(&config->learners); + syncRaftInitNodeMap(&config->learnersNext); + syncRaftInitQuorumJointConfig(&config->voters); + config->autoLeave = false; +} + +void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config) { + syncRaftFreeNodeMap(&config->learners); + syncRaftFreeNodeMap(&config->learnersNext); + syncRaftFreeNodeMap(&config->voters.incoming); + syncRaftFreeNodeMap(&config->voters.outgoing); +} + +// ResetVotes prepares for a new round of vote counting via recordVote. void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { - memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(ESyncRaftVoteType) * TSDB_MAX_REPLICA); + taosHashClear(tracker->votesMap); } void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) { - int i; - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - SSyncRaftProgress* progress = &(tracker->progressMap.progress[i]); - visit(i, progress, arg); - } + syncRaftVisitProgressMap(&tracker->progressMap, visit, arg); } -void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) { - if (tracker->votes[i] != SYNC_RAFT_VOTE_RESP_UNKNOWN) { +// RecordVote records that the node with the given id voted for this Raft +// instance if v == true (and declined it otherwise). +void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant) { + ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &id, sizeof(SyncNodeId*)); + if (pType != NULL) { return; } - tracker->votes[i] = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; + taosHashPut(tracker->votesMap, &id, sizeof(SyncNodeId), &grant, sizeof(bool*)); } -void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) { +void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) { + memcpy(to, from, sizeof(SSyncRaftProgressTrackerConfig)); +} +int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + // NB: intentionally allow the empty config. In production we'll never see a + // non-empty config (we prevent it from being created) but we will need to + // be able to *create* an initial config, for example during bootstrap (or + // during tests). Instead of having to hand-code this, we allow + // transitioning from an empty config into any other legal and non-empty + // config. + if (!syncRaftIsAllNodeInProgressMap(&config->voters.incoming, progressMap)) return -1; + if (!syncRaftIsAllNodeInProgressMap(&config->voters.outgoing, progressMap)) return -1; + if (!syncRaftIsAllNodeInProgressMap(&config->learners, progressMap)) return -1; + if (!syncRaftIsAllNodeInProgressMap(&config->learnersNext, progressMap)) return -1; + return 0; } -/** - * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the - * election outcome is known. - **/ +// TallyVotes returns the number of granted and rejected Votes, and whether the +// election outcome is known. ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) { - int i; - SSyncRaftProgress* progress; + SSyncRaftProgress* progress = NULL; int r, g; - for (i = 0, r = 0, g = 0; i < TSDB_MAX_REPLICA; ++i) { - progress = &(tracker->progressMap.progress[i]); + // Make sure to populate granted/rejected correctly even if the Votes slice + // contains members no longer part of the configuration. This doesn't really + // matter in the way the numbers are used (they're informational), but might + // as well get it right. + while (!syncRaftIterateProgressMap(&tracker->progressMap, progress)) { if (progress->id == SYNC_NON_NODE_ID) { continue; } - if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) { + bool* v = taosHashGet(tracker->votesMap, &progress->id, sizeof(SyncNodeId*)); + if (v == NULL) { continue; } - if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) { + if (*v) { g++; } else { r++; @@ -77,12 +114,43 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r if (rejected) *rejected = r; if (granted) *granted = g; - return syncRaftVoteResult(&(tracker->config.voters), tracker->votes); + return syncRaftVoteResult(&(tracker->config.voters), tracker->votesMap); +} + +void syncRaftConfigState(SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { + syncRaftCopyNodeMap(&tracker->config.voters.incoming, &cs->voters); + syncRaftCopyNodeMap(&tracker->config.voters.outgoing, &cs->votersOutgoing); + syncRaftCopyNodeMap(&tracker->config.learners, &cs->learners); + syncRaftCopyNodeMap(&tracker->config.learnersNext, &cs->learnersNext); + cs->autoLeave = tracker->config.autoLeave; } -void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { - memcpy(&cs->voters, &tracker->config.voters.incoming, sizeof(SSyncRaftNodeMap)); - memcpy(&cs->votersOutgoing, &tracker->config.voters.outgoing, sizeof(SSyncRaftNodeMap)); - memcpy(&cs->learners, &tracker->config.learners, sizeof(SSyncRaftNodeMap)); - memcpy(&cs->learnersNext, &tracker->config.learnersNext, sizeof(SSyncRaftNodeMap)); +static void matchAckIndexer(SyncNodeId id, void* arg, SyncIndex* index) { + SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)arg; + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&tracker->progressMap, id); + if (progress == NULL) { + *index = 0; + return; + } + *index = progress->matchIndex; +} + +// Committed returns the largest log index known to be committed based on what +// the voting members of the group have acknowledged. +SyncIndex syncRaftCommittedIndex(SSyncRaftProgressTracker* tracker) { + return syncRaftJointConfigCommittedIndex(&tracker->config.voters, matchAckIndexer, tracker); +} + +static void visitProgressActive(SSyncRaftProgress* progress, void* arg) { + SHashObj* votesMap = (SHashObj*)arg; + taosHashPut(votesMap, &progress->id, sizeof(SyncNodeId), &progress->recentActive, sizeof(bool)); +} + +// QuorumActive returns true if the quorum is active from the view of the local +// raft state machine. Otherwise, it returns false. +bool syncRaftQuorumActive(SSyncRaftProgressTracker* tracker) { + SHashObj* votesMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + syncRaftVisitProgressMap(&tracker->progressMap, visitProgressActive, votesMap); + + return syncRaftVoteResult(&tracker->config.voters, votesMap) == SYNC_RAFT_VOTE_WON; } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index fa663b6fc34166cc6d69824f5a2d7e85fcc5e717..70c078b6f586a4a655f977687306c63def07b316 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "sync_raft_node_map.h" #include "sync_raft_quorum_majority.h" #include "sync_raft_quorum_joint.h" #include "sync_raft_quorum.h" @@ -22,9 +23,9 @@ * a result indicating whether the vote is pending, lost, or won. A joint quorum * requires both majority quorums to vote in favor. **/ -ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes) { - ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->incoming), votes); - ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->outgoing), votes); +ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap) { + ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->incoming), votesMap); + ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->outgoing), votesMap); if (r1 == r2) { // If they agree, return the agreed state. @@ -40,46 +41,35 @@ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const E return SYNC_RAFT_VOTE_PENDING; } -void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) { - int i, min; +void syncRaftInitQuorumJointConfig(SSyncRaftQuorumJointConfig* config) { + syncRaftInitNodeMap(&config->incoming); + syncRaftInitNodeMap(&config->outgoing); +} - for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) { - if (config->incoming.nodeId[i] == id) { - return; - } - if (min == -1 && config->incoming.nodeId[i] == SYNC_NON_NODE_ID) { - min = i; - } - } +void syncRaftFreeQuorumJointConfig(SSyncRaftQuorumJointConfig* config) { + syncRaftFreeNodeMap(&config->incoming); + syncRaftFreeNodeMap(&config->outgoing); +} - assert(min != -1); - config->incoming.nodeId[min] = id; - config->incoming.replica += 1; +void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) { + syncRaftAddToNodeMap(&config->incoming, id); } void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) { - int i; + syncRaftRemoveFromNodeMap(&config->incoming, id); +} - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (config->incoming.nodeId[i] == id) { - config->incoming.replica -= 1; - config->incoming.nodeId[i] = SYNC_NON_NODE_ID; - break; - } - } +void syncRaftJointConfigIDs(SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap) { + syncRaftCopyNodeMap(&config->incoming, nodeMap); - assert(config->incoming.replica >= 0); + syncRaftUnionNodeMap(&config->outgoing, nodeMap); } +SyncIndex syncRaftJointConfigCommittedIndex(const SSyncRaftQuorumJointConfig* config, matchAckIndexerFp indexer, void* arg) { + SyncIndex index0, index1; -bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { - int i; - - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (nodeId == nodeMap->nodeId[i]) { - return true; - } - } + index0 = syncRaftMajorityConfigCommittedIndex(&config->incoming, indexer, arg); + index1 = syncRaftMajorityConfigCommittedIndex(&config->outgoing, indexer, arg); - return false; + return index0 < index1 ? index0 : index1; } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c index 73eb378e09ac21827ee3fe3c18eae49c91dde2ed..313f213cdac5a7cb99a5598991c8e9171161f3d6 100644 --- a/source/libs/sync/src/sync_raft_quorum_majority.c +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -13,42 +13,109 @@ * along with this program. If not, see . */ +#include "sync_const.h" #include "sync_raft_quorum.h" #include "sync_raft_quorum_majority.h" +#include "sync_raft_node_map.h" -/** - * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns - * a result indicating whether the vote is pending (i.e. neither a quorum of - * yes/no has been reached), won (a quorum of yes has been reached), or lost (a - * quorum of no has been reached). - **/ -ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes) { - if (config->replica == 0) { +// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns +// a result indicating whether the vote is pending (i.e. neither a quorum of +// yes/no has been reached), won (a quorum of yes has been reached), or lost (a +// quorum of no has been reached). +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap) { + int n = syncRaftNodeMapSize(config); + if (n == 0) { + // By convention, the elections on an empty config win. This comes in + // handy with joint quorums because it'll make a half-populated joint + // quorum behave like a majority quorum. return SYNC_RAFT_VOTE_WON; } int i, g, r, missing; - for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) { - if (config->nodeId[i] == SYNC_NON_NODE_ID) { + i = g = r = missing = 0; + SyncNodeId* pId = NULL; + while (!syncRaftIterateNodeMap(config, pId)) { + const bool* v = (const bool*)taosHashGet(votesMap, pId, sizeof(SyncNodeId*)); + if (v == NULL) { + missing += 1; continue; } - if (votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) { - missing += 1; - } else if (votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) { + if (*v) { g +=1; } else { r += 1; } } - int quorum = config->replica / 2 + 1; + int quorum = n / 2 + 1; if (g >= quorum) { return SYNC_RAFT_VOTE_WON; } - if (r + missing >= quorum) { + if (g + missing >= quorum) { return SYNC_RAFT_VOTE_PENDING; } return SYNC_RAFT_VOTE_LOST; +} + +int compSyncIndex(const void * elem1, const void * elem2) { + SyncIndex index1 = *((SyncIndex*)elem1); + SyncIndex index2 = *((SyncIndex*)elem1); + if (index1 > index2) return 1; + if (index1 < index2) return -1; + return 0; +} + +SyncIndex syncRaftMajorityConfigCommittedIndex(const SSyncRaftNodeMap* config, matchAckIndexerFp indexer, void* arg) { + int n = syncRaftNodeMapSize(config); + if (n == 0) { + // This plays well with joint quorums which, when one half is the zero + // MajorityConfig, should behave like the other half. + return kMaxCommitIndex; + } + + // Use an on-stack slice to collect the committed indexes when n <= 7 + // (otherwise we alloc). The alternative is to stash a slice on + // MajorityConfig, but this impairs usability (as is, MajorityConfig is just + // a map, and that's nice). The assumption is that running with a + // replication factor of >7 is rare, and in cases in which it happens + // performance is a lesser concern (additionally the performance + // implications of an allocation here are far from drastic). + SyncIndex* srt = NULL; + SyncIndex srk[TSDB_MAX_REPLICA]; + if (n > TSDB_MAX_REPLICA) { + srt = (SyncIndex*)malloc(sizeof(SyncIndex) * n); + if (srt == NULL) { + return kMaxCommitIndex; + } + } else { + srt = &srk[0]; + } + + // Fill the slice with the indexes observed. Any unused slots will be + // left as zero; these correspond to voters that may report in, but + // haven't yet. We fill from the right (since the zeroes will end up on + // the left after sorting below anyway). + SyncNodeId *pId = NULL; + int i = 0; + SyncIndex index; + while (!syncRaftIterateNodeMap(config, pId)) { + indexer(*pId, arg, &index); + srt[i++] = index; + } + + // Sort by index. Use a bespoke algorithm (copied from the stdlib's sort + // package) to keep srt on the stack. + qsort(srt, n, sizeof(SyncIndex), compSyncIndex); + + // The smallest index into the array for which the value is acked by a + // quorum. In other words, from the end of the slice, move n/2+1 to the + // left (accounting for zero-indexing). + index = srt[n - (n/2 + 1)]; + if (srt != &srk[0]) { + free(srt); + } + + return index; } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_restore.c b/source/libs/sync/src/sync_raft_restore.c index 01bc7da7eb4ef92e9bb8b6fbc681a70856031157..d1acd3e8e954a7dea26adba6f01ad6963c644008 100644 --- a/source/libs/sync/src/sync_raft_restore.c +++ b/source/libs/sync/src/sync_raft_restore.c @@ -17,6 +17,7 @@ #include "sync_raft_restore.h" #include "sync_raft_progress_tracker.h" +static void addToConfChangeSingleArray(SSyncConfChangeSingleArray* out, int* i, const SSyncRaftNodeMap* nodeMap, ESyncRaftConfChangeType t); static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in); // syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and @@ -27,21 +28,26 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA // the Changer only needs a ProgressMap (not a whole Tracker) at which point // this can just take LastIndex and MaxInflight directly instead and cook up // the results from that alone. -int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) { +int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs, + SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { SSyncConfChangeSingleArray outgoing; SSyncConfChangeSingleArray incoming; SSyncConfChangeSingleArray css; SSyncRaftProgressTracker* tracker = changer->tracker; - SSyncRaftProgressTrackerConfig* config = &tracker->config; - SSyncRaftProgressMap* progressMap = &tracker->progressMap; int i, ret; + syncRaftInitConfArray(&outgoing); + syncRaftInitConfArray(&incoming); + + syncRaftInitTrackConfig(config); + syncRaftInitProgressMap(progressMap); + ret = toConfChangeSingle(cs, &outgoing, &incoming); if (ret != 0) { goto out; } - if (outgoing.n == 0) { + if (syncRaftConfArrayIsEmpty(&outgoing)) { // No outgoing config, so just apply the incoming changes one by one. for (i = 0; i < incoming.n; ++i) { css = (SSyncConfChangeSingleArray) { @@ -52,6 +58,9 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) if (ret != 0) { goto out; } + + syncRaftCopyTrackerConfig(config, &changer->tracker->config); + syncRaftCopyProgressMap(progressMap, &changer->tracker->progressMap); } } else { // The ConfState describes a joint configuration. @@ -68,6 +77,8 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) if (ret != 0) { goto out; } + syncRaftCopyTrackerConfig(config, &changer->tracker->config); + syncRaftCopyProgressMap(progressMap, &changer->tracker->progressMap); } ret = syncRaftChangerEnterJoint(changer, cs->autoLeave, &incoming, config, progressMap); @@ -77,11 +88,24 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) } out: - if (incoming.n != 0) free(incoming.changes); - if (outgoing.n != 0) free(outgoing.changes); + syncRaftFreeConfArray(&incoming); + syncRaftFreeConfArray(&outgoing); + return ret; } +static void addToConfChangeSingleArray(SSyncConfChangeSingleArray* out, int* i, const SSyncRaftNodeMap* nodeMap, ESyncRaftConfChangeType t) { + SyncNodeId* pId = NULL; + + while (!syncRaftIterateNodeMap(nodeMap, pId)) { + out->changes[*i] = (SSyncConfChangeSingle) { + .type = t, + .nodeId = *pId, + }; + *i += 1; + } +} + // toConfChangeSingle translates a conf state into 1) a slice of operations creating // first the config that will become the outgoing one, and then the incoming one, and // b) another slice that, when applied to the config resulted from 1), represents the @@ -89,15 +113,16 @@ out: static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in) { int i; - out->n = in->n = 0; - - out->n = cs->votersOutgoing.replica; + out->n = syncRaftNodeMapSize(&cs->votersOutgoing); out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * out->n); if (out->changes == NULL) { out->n = 0; return -1; } - in->n = cs->votersOutgoing.replica + cs->voters.replica + cs->learners.replica + cs->learnersNext.replica; + in->n = syncRaftNodeMapSize(&cs->votersOutgoing) + + syncRaftNodeMapSize(&cs->voters) + + syncRaftNodeMapSize(&cs->learners) + + syncRaftNodeMapSize(&cs->learnersNext); out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * in->n); if (in->changes == NULL) { in->n = 0; @@ -132,50 +157,24 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA // // as desired. - for (i = 0; i < cs->votersOutgoing.replica; ++i) { - // If there are outgoing voters, first add them one by one so that the - // (non-joint) config has them all. - out->changes[i] = (SSyncConfChangeSingle) { - .type = SYNC_RAFT_Conf_AddNode, - .nodeId = cs->votersOutgoing.nodeId[i], - }; - } + // If there are outgoing voters, first add them one by one so that the + // (non-joint) config has them all. + i = 0; + addToConfChangeSingleArray(out, &i, &cs->votersOutgoing, SYNC_RAFT_Conf_AddNode); + assert(i == out->n); // We're done constructing the outgoing slice, now on to the incoming one // (which will apply on top of the config created by the outgoing slice). - + i = 0; + // First, we'll remove all of the outgoing voters. - int j = 0; - for (i = 0; i < cs->votersOutgoing.replica; ++i) { - in->changes[j] = (SSyncConfChangeSingle) { - .type = SYNC_RAFT_Conf_RemoveNode, - .nodeId = cs->votersOutgoing.nodeId[i], - }; - j += 1; - } + addToConfChangeSingleArray(in, &i, &cs->votersOutgoing, SYNC_RAFT_Conf_RemoveNode); + // Then we'll add the incoming voters and learners. - for (i = 0; i < cs->voters.replica; ++i) { - in->changes[j] = (SSyncConfChangeSingle) { - .type = SYNC_RAFT_Conf_AddNode, - .nodeId = cs->voters.nodeId[i], - }; - j += 1; - } - for (i = 0; i < cs->learners.replica; ++i) { - in->changes[j] = (SSyncConfChangeSingle) { - .type = SYNC_RAFT_Conf_AddLearnerNode, - .nodeId = cs->learners.nodeId[i], - }; - j += 1; - } - // Same for LearnersNext; these are nodes we want to be learners but which - // are currently voters in the outgoing config. - for (i = 0; i < cs->learnersNext.replica; ++i) { - in->changes[j] = (SSyncConfChangeSingle) { - .type = SYNC_RAFT_Conf_AddLearnerNode, - .nodeId = cs->learnersNext.nodeId[i], - }; - j += 1; - } + addToConfChangeSingleArray(in, &i, &cs->voters, SYNC_RAFT_Conf_AddNode); + addToConfChangeSingleArray(in, &i, &cs->learners, SYNC_RAFT_Conf_AddLearnerNode); + addToConfChangeSingleArray(in, &i, &cs->learnersNext, SYNC_RAFT_Conf_AddLearnerNode); + assert(i == in->n); + return 0; } \ No newline at end of file