未验证 提交 885349af 编写于 作者: C codedump 提交者: GitHub

Merge pull request #8758 from taosdata/feature/sync-implementation

Feature/sync implementation
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "sync.h" #include "sync.h"
#include "sync_type.h" #include "sync_type.h"
#include "thash.h"
#include "raft_message.h" #include "raft_message.h"
#include "sync_raft_impl.h" #include "sync_raft_impl.h"
#include "sync_raft_quorum.h" #include "sync_raft_quorum.h"
...@@ -43,9 +44,9 @@ struct SSyncRaft { ...@@ -43,9 +44,9 @@ struct SSyncRaft {
// owner sync node // owner sync node
SSyncNode* pNode; SSyncNode* pNode;
SSyncCluster cluster; // hash map nodeId -> SNodeInfo*
SHashObj* nodeInfoMap;
int selfIndex;
SyncNodeId selfId; SyncNodeId selfId;
SyncGroupId selfGroupId; SyncGroupId selfGroupId;
......
...@@ -39,8 +39,6 @@ struct SSyncRaftLog { ...@@ -39,8 +39,6 @@ struct SSyncRaftLog {
SyncIndex commitIndex; SyncIndex commitIndex;
SyncIndex appliedIndex; SyncIndex appliedIndex;
}; };
SSyncRaftLog* syncRaftLogOpen(); SSyncRaftLog* syncRaftLogOpen();
......
...@@ -20,11 +20,11 @@ ...@@ -20,11 +20,11 @@
#include "syncInt.h" #include "syncInt.h"
#include "sync_type.h" #include "sync_type.h"
// syncRaftReplicate sends an append RPC with new entries to the given peer, // syncRaftMaybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty // if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent // argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but // ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch). // are undesirable when we're sending multiple messages in a batch).
bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty); bool syncRaftMaybeSendAppend(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty);
#endif /* TD_SYNC_RAFT_REPLICATION_H */ #endif /* TD_SYNC_RAFT_REPLICATION_H */
...@@ -13,13 +13,13 @@ ...@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "raft_configuration.h" #ifndef _TD_LIBS_SYNC_CONST_H
#include "raft.h" #define _TD_LIBS_SYNC_CONST_H
int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id) { #include "sync.h"
return (int)(id);
}
int syncRaftConfigurationVoterCount(SSyncRaft *pRaft) { static int kSyncRaftMaxInflghtMsgs = 20;
return pRaft->cluster.replica;
} static SyncIndex kMaxCommitIndex = UINT64_MAX;
\ No newline at end of file
#endif /* _TD_LIBS_SYNC_CONST_H */
...@@ -33,6 +33,11 @@ struct SSyncRaftChanger { ...@@ -33,6 +33,11 @@ struct SSyncRaftChanger {
typedef int (*configChangeFp)(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, typedef int (*configChangeFp)(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
// Simple carries out a series of configuration changes that (in aggregate)
// mutates the incoming majority config Voters[0] by at most one. This method
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
// outgoing configuration).
int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
......
...@@ -28,6 +28,8 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft); ...@@ -28,6 +28,8 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft);
void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType); void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType);
void syncRaftCampaign(SSyncRaft* pRaft, ESyncRaftElectionType cType);
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft); void syncRaftTriggerHeartbeat(SSyncRaft* pRaft);
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft);
...@@ -51,4 +53,6 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState); ...@@ -51,4 +53,6 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState);
void syncRaftBroadcastAppend(SSyncRaft* pRaft); void syncRaftBroadcastAppend(SSyncRaft* pRaft);
SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id);
#endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */ #endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */
...@@ -18,54 +18,47 @@ ...@@ -18,54 +18,47 @@
#include "sync.h" #include "sync.h"
/** // Inflights limits the number of MsgApp (represented by the largest index
* SSyncRaftInflights limits the number of MsgApp (represented by the largest index // contained within) sent to followers but not yet acknowledged by them. Callers
* contained within) sent to followers but not yet acknowledged by them. Callers // use Full() to check whether more messages can be sent, call Add() whenever
* use syncRaftInflightFull() to check whether more messages can be sent, // they are sending a new append, and release "quota" via FreeLE() whenever an
* call syncRaftInflightAdd() whenever they are sending a new append, // ack is received.
* and release "quota" via FreeLE() whenever an ack is received.
**/
typedef struct SSyncRaftInflights { typedef struct SSyncRaftInflights {
/* the starting index in the buffer */ // the starting index in the buffer
int start; int start;
/* number of inflights in the buffer */ // number of inflights in the buffer
int count; int count;
/* the size of the buffer */ // the size of the buffer
int size; int size;
/** // buffer contains the index of the last entry
* buffer contains the index of the last entry // inside one message.
* inside one message.
**/
SyncIndex* buffer; SyncIndex* buffer;
} SSyncRaftInflights; } SSyncRaftInflights;
SSyncRaftInflights* syncRaftOpenInflights(int size); SSyncRaftInflights* syncRaftOpenInflights(int size);
void syncRaftCloseInflights(SSyncRaftInflights*); void syncRaftCloseInflights(SSyncRaftInflights*);
// reset frees all inflights.
static FORCE_INLINE void syncRaftInflightReset(SSyncRaftInflights* inflights) { static FORCE_INLINE void syncRaftInflightReset(SSyncRaftInflights* inflights) {
inflights->count = 0; inflights->count = 0;
inflights->start = 0; inflights->start = 0;
} }
// Full returns true if no more messages can be sent at the moment.
static FORCE_INLINE bool syncRaftInflightFull(SSyncRaftInflights* inflights) { static FORCE_INLINE bool syncRaftInflightFull(SSyncRaftInflights* inflights) {
return inflights->count == inflights->size; return inflights->count == inflights->size;
} }
/** // Add notifies the Inflights that a new message with the given index is being
* syncRaftInflightAdd notifies the Inflights that a new message with the given index is being // dispatched. Full() must be called prior to Add() to verify that there is room
* dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() // for one more message, and consecutive calls to add Add() must provide a
* to verify that there is room for one more message, // monotonic sequence of indexes.
* and consecutive calls to add syncRaftInflightAdd() must provide a
* monotonic sequence of indexes.
**/
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex); void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex);
/** // FreeLE frees the inflights smaller or equal to the given `to` flight.
* syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight.
**/
void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex); void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex);
/** /**
......
...@@ -13,15 +13,37 @@ ...@@ -13,15 +13,37 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_LIBS_SYNC_RAFT_CONFIGURATION_H #ifndef _TD_LIBS_SYNC_RAFT_NODE_MAP_H
#define _TD_LIBS_SYNC_RAFT_CONFIGURATION_H #define _TD_LIBS_SYNC_RAFT_NODE_MAP_H
#include "thash.h"
#include "sync.h" #include "sync.h"
#include "sync_type.h" #include "sync_type.h"
// return -1 if cannot find this id struct SSyncRaftNodeMap {
int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id); SHashObj* nodeIdMap;
};
int syncRaftConfigurationVoterCount(SSyncRaft *pRaft); void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap);
void syncRaftFreeNodeMap(SSyncRaftNodeMap* nodeMap);
#endif /* _TD_LIBS_SYNC_RAFT_CONFIGURATION_H */ void syncRaftClearNodeMap(SSyncRaftNodeMap* nodeMap);
\ No newline at end of file
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
void syncRaftCopyNodeMap(SSyncRaftNodeMap* from, SSyncRaftNodeMap* to);
void syncRaftUnionNodeMap(SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to);
void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
void syncRaftRemoveFromNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap);
// return true if reach the end
bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId);
bool syncRaftIsAllNodeInProgressMap(SSyncRaftNodeMap* nodeMap, SSyncRaftProgressMap* progressMap);
#endif /* _TD_LIBS_SYNC_RAFT_NODE_MAP_H */
\ No newline at end of file
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "sync_type.h" #include "sync_type.h"
#include "sync_raft_inflights.h" #include "sync_raft_inflights.h"
#include "thash.h"
/** /**
* State defines how the leader should interact with the follower. * State defines how the leader should interact with the follower.
...@@ -64,141 +65,123 @@ static const char* kProgressStateString[] = { ...@@ -64,141 +65,123 @@ static const char* kProgressStateString[] = {
"Snapshot", "Snapshot",
}; };
/** // Progress represents a follower’s progress in the view of the leader. Leader
* Progress represents a follower’s progress in the view of the leader. Leader maintains // maintains progresses of all followers, and sends entries to the follower
* progresses of all followers, and sends entries to the follower based on its progress. // based on its progress.
**/ //
// NB(tbg): Progress is basically a state machine whose transitions are mostly
// strewn around `*raft.raft`. Additionally, some fields are only used when in a
// certain State. All of this isn't ideal.
struct SSyncRaftProgress { struct SSyncRaftProgress {
// index in raft cluster config SyncGroupId groupId;
int selfIndex;
SyncNodeId id; SyncNodeId id;
int16_t refCount;
SyncIndex nextIndex; SyncIndex nextIndex;
SyncIndex matchIndex; SyncIndex matchIndex;
/** // State defines how the leader should interact with the follower.
* State defines how the leader should interact with the follower. //
* // When in StateProbe, leader sends at most one replication message
* When in StateProbe, leader sends at most one replication message // per heartbeat interval. It also probes actual progress of the follower.
* per heartbeat interval. It also probes actual progress of the follower. //
* // When in StateReplicate, leader optimistically increases next
* When in StateReplicate, leader optimistically increases next // to the latest entry sent after sending replication message. This is
* to the latest entry sent after sending replication message. This is // an optimized state for fast replicating log entries to the follower.
* an optimized state for fast replicating log entries to the follower. //
* // When in StateSnapshot, leader should have sent out snapshot
* When in StateSnapshot, leader should have sent out snapshot // before and stops sending any replication message.
* before and stops sending any replication message.
**/
ESyncRaftProgressState state; ESyncRaftProgressState state;
/** // PendingSnapshot is used in StateSnapshot.
* pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT. // If there is a pending snapshot, the pendingSnapshot will be set to the
* If there is a pending snapshot, the pendingSnapshotIndex will be set to the // index of the snapshot. If pendingSnapshot is set, the replication process of
* index of the snapshot. If pendingSnapshotIndex is set, the replication process of // this Progress will be paused. raft will not resend snapshot until the pending one
* this Progress will be paused. raft will not resend snapshot until the pending one // is reported to be failed.
* is reported to be failed.
**/
SyncIndex pendingSnapshotIndex; SyncIndex pendingSnapshotIndex;
/** // RecentActive is true if the progress is recently active. Receiving any messages
* recentActive is true if the progress is recently active. Receiving any messages // from the corresponding follower indicates the progress is active.
* from the corresponding follower indicates the progress is active. // RecentActive can be reset to false after an election timeout.
* RecentActive can be reset to false after an election timeout. //
**/ // TODO(tbg): the leader should always have this set to true.
bool recentActive; bool recentActive;
/** // ProbeSent is used while this follower is in StateProbe. When ProbeSent is
* probeSent is used while this follower is in StateProbe. When probeSent is // true, raft should pause sending replication message to this peer until
* true, raft should pause sending replication message to this peer until // ProbeSent is reset. See ProbeAcked() and IsPaused().
* probeSent is reset. See ProbeAcked() and IsPaused().
**/
bool probeSent; bool probeSent;
/** // Inflights is a sliding window for the inflight messages.
* inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries.
* Each inflight message contains one or more log entries. // The max number of entries per message is defined in raft config as MaxSizePerMsg.
* The max number of entries per message is defined in raft config as MaxSizePerMsg. // Thus inflight effectively limits both the number of inflight messages
* Thus inflight effectively limits both the number of inflight messages // and the bandwidth each Progress can use.
* and the bandwidth each Progress can use. // When inflights is Full, no more message should be sent.
* When inflights is Full, no more message should be sent. // When a leader sends out a message, the index of the last
* When a leader sends out a message, the index of the last // entry should be added to inflights. The index MUST be added
* entry should be added to inflights. The index MUST be added // into inflights in order.
* into inflights in order. // When a leader receives a reply, the previous inflights should
* When a leader receives a reply, the previous inflights should // be freed by calling inflights.FreeLE with the index of the last
* be freed by calling inflights.FreeLE with the index of the last // received entry.
* received entry.
**/
SSyncRaftInflights* inflights; SSyncRaftInflights* inflights;
/** // IsLearner is true if this progress is tracked for a learner.
* IsLearner is true if this progress is tracked for a learner.
**/
bool isLearner; bool isLearner;
}; };
struct SSyncRaftProgressMap { struct SSyncRaftProgressMap {
SSyncRaftProgress progress[TSDB_MAX_REPLICA]; // map nodeId -> SSyncRaftProgress*
SHashObj* progressMap;
}; };
static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgress* progress) { static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgress* progress) {
return kProgressStateString[progress->state]; return kProgressStateString[progress->state];
} }
void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress); void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress);
/** // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
* syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, // optionally and if larger, the index of the pending snapshot.
* optionally and if larger, the index of the pending snapshot.
**/
void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress); void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress);
/** // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
* syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
**/
void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress); void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress);
/** // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
* syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from
* index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true.
* an outdated message. Otherwise it updates the progress and returns true.
**/
bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex); bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex);
/** // OptimisticUpdate signals that appends all the way up to and including index n
* syncRaftProgressOptimisticNextIndex signals that appends all the way up to and including index n // are in-flight. As a result, Next is increased to n+1.
* are in-flight. As a result, Next is increased to n+1.
**/
static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) {
progress->nextIndex = nextIndex + 1; progress->nextIndex = nextIndex + 1;
} }
/** // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
* syncRaftProgressMaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The // arguments are the index of the append message rejected by the follower, and
* arguments are the index of the append message rejected by the follower, and // the hint that we want to decrease to.
* the hint that we want to decrease to. //
* // Rejections can happen spuriously as messages are sent out of order or
* Rejections can happen spuriously as messages are sent out of order or // duplicated. In such cases, the rejection pertains to an index that the
* duplicated. In such cases, the rejection pertains to an index that the // Progress already knows were previously acknowledged, and false is returned
* Progress already knows were previously acknowledged, and false is returned // without changing the Progress.
* without changing the Progress. //
* // If the rejection is genuine, Next is lowered sensibly, and the Progress is
* If the rejection is genuine, Next is lowered sensibly, and the Progress is // cleared for sending log entries.
* cleared for sending log entries.
**/
bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
SyncIndex rejected, SyncIndex matchHint); SyncIndex rejected, SyncIndex matchHint);
/** // IsPaused returns whether sending log entries to this node has been throttled.
* syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. // This is done when a node has rejected recent MsgApps, is currently waiting
* This is done when a node has rejected recent MsgApps, is currently waiting // for a snapshot, or has reached the MaxInflightMsgs limit. In normal
* for a snapshot, or has reached the MaxInflightMsgs limit. In normal // operation, this is false. A throttled node will be contacted less frequently
* operation, this is false. A throttled node will be contacted less frequently // until it has reached a state in which it's able to accept a steady stream of
* until it has reached a state in which it's able to accept a steady stream of // log entries again.
* log entries again.
**/
bool syncRaftProgressIsPaused(SSyncRaftProgress* progress); bool syncRaftProgressIsPaused(SSyncRaftProgress* progress);
static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) { static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) {
...@@ -221,22 +204,35 @@ static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progres ...@@ -221,22 +204,35 @@ static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progres
return progress->recentActive; return progress->recentActive;
} }
int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id); void syncRaftInitProgressMap(SSyncRaftProgressMap* progressMap);
void syncRaftFreeProgressMap(SSyncRaftProgressMap* progressMap);
void syncRaftClearProgressMap(SSyncRaftProgressMap* progressMap);
void syncRaftCopyProgressMap(SSyncRaftProgressMap* from, SSyncRaftProgressMap* to);
SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id);
int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgress* progress);
void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id);
bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id);
/** /**
* return true if progress's log is up-todate * return true if progress's log is up-todate
**/ **/
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress);
// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
// snapshot index.
void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex);
void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to); void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to);
void syncRaftProgressMapCopy(const SSyncRaftProgressMap* from, SSyncRaftProgressMap* to); // return true if reach the end
bool syncRaftIterateProgressMap(const SSyncRaftProgressMap* progressMap, SSyncRaftProgress *pProgress);
bool syncRaftVisitProgressMap(SSyncRaftProgressMap* progressMap, visitProgressFp fp, void* arg);
#if 0 #if 0
......
...@@ -21,7 +21,9 @@ ...@@ -21,7 +21,9 @@
#include "sync_raft_quorum_joint.h" #include "sync_raft_quorum_joint.h"
#include "sync_raft_progress.h" #include "sync_raft_progress.h"
#include "sync_raft_proto.h" #include "sync_raft_proto.h"
#include "thash.h"
// Config reflects the configuration tracked in a ProgressTracker.
struct SSyncRaftProgressTrackerConfig { struct SSyncRaftProgressTrackerConfig {
SSyncRaftQuorumJointConfig voters; SSyncRaftQuorumJointConfig voters;
...@@ -83,34 +85,47 @@ struct SSyncRaftProgressTracker { ...@@ -83,34 +85,47 @@ struct SSyncRaftProgressTracker {
SSyncRaftProgressMap progressMap; SSyncRaftProgressMap progressMap;
ESyncRaftVoteType votes[TSDB_MAX_REPLICA]; // nodeid -> ESyncRaftVoteType map
SHashObj* votesMap;
int maxInflightMsgs; int maxInflightMsgs;
SSyncRaft* pRaft;
}; };
SSyncRaftProgressTracker* syncRaftOpenProgressTracker(); SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft);
void syncRaftInitTrackConfig(SSyncRaftProgressTrackerConfig* config);
void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config);
void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config);
// ResetVotes prepares for a new round of vote counting via recordVote.
void syncRaftResetVotes(SSyncRaftProgressTracker*); void syncRaftResetVotes(SSyncRaftProgressTracker*);
typedef void (*visitProgressFp)(int i, SSyncRaftProgress* progress, void* arg);
void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg);
/** // RecordVote records that the node with the given id voted for this Raft
* syncRaftRecordVote records that the node with the given id voted for this Raft // instance if v == true (and declined it otherwise).
* instance if v == true (and declined it otherwise). void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant);
**/
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant);
void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressTrackerConfig* result); void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to);
int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
/** // TallyVotes returns the number of granted and rejected Votes, and whether the
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the // election outcome is known.
* election outcome is known.
**/
ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted);
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); void syncRaftConfigState(SSyncRaftProgressTracker* tracker, SSyncConfigState* cs);
// Committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
SyncIndex syncRaftCommittedIndex(SSyncRaftProgressTracker* tracker);
// QuorumActive returns true if the quorum is active from the view of the local
// raft state machine. Otherwise, it returns false.
bool syncRaftQuorumActive(SSyncRaftProgressTracker* tracker);
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define TD_SYNC_RAFT_PROTO_H #define TD_SYNC_RAFT_PROTO_H
#include "sync_type.h" #include "sync_type.h"
#include "sync_raft_node_map.h"
typedef enum ESyncRaftConfChangeType { typedef enum ESyncRaftConfChangeType {
SYNC_RAFT_Conf_AddNode = 0, SYNC_RAFT_Conf_AddNode = 0,
...@@ -58,4 +59,19 @@ typedef struct SSyncConfigState { ...@@ -58,4 +59,19 @@ typedef struct SSyncConfigState {
bool autoLeave; bool autoLeave;
} SSyncConfigState; } SSyncConfigState;
static FORCE_INLINE bool syncRaftConfArrayIsEmpty(const SSyncConfChangeSingleArray* ary) {
return ary->n == 0;
}
static FORCE_INLINE void syncRaftInitConfArray(SSyncConfChangeSingleArray* ary) {
*ary = (SSyncConfChangeSingleArray) {
.changes = NULL,
.n = 0,
};
}
static FORCE_INLINE void syncRaftFreeConfArray(SSyncConfChangeSingleArray* ary) {
if (ary->changes != NULL) free(ary->changes);
}
#endif /* TD_SYNC_RAFT_PROTO_H */ #endif /* TD_SYNC_RAFT_PROTO_H */
...@@ -19,24 +19,31 @@ ...@@ -19,24 +19,31 @@
#include "taosdef.h" #include "taosdef.h"
#include "sync.h" #include "sync.h"
#include "sync_type.h" #include "sync_type.h"
#include "sync_raft_node_map.h"
#include "thash.h"
/** // JointConfig is a configuration of two groups of (possibly overlapping)
* SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping) // majority configurations. Decisions require the support of both majorities.
* majority configurations. Decisions require the support of both majorities.
**/
typedef struct SSyncRaftQuorumJointConfig { typedef struct SSyncRaftQuorumJointConfig {
SSyncRaftNodeMap outgoing; SSyncRaftNodeMap outgoing;
SSyncRaftNodeMap incoming; SSyncRaftNodeMap incoming;
} SSyncRaftQuorumJointConfig; } SSyncRaftQuorumJointConfig;
/** // IDs returns a newly initialized map representing the set of voters present
* syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns // in the joint configuration.
* a result indicating whether the vote is pending, lost, or won. A joint quorum void syncRaftJointConfigIDs(SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap);
* requires both majority quorums to vote in favor.
**/
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes);
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); // CommittedIndex returns the largest committed index for the given joint
// quorum. An index is jointly committed if it is committed in both constituent
// majorities.
SyncIndex syncRaftJointConfigCommittedIndex(const SSyncRaftQuorumJointConfig* config, matchAckIndexerFp indexer, void* arg);
// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns
// a result indicating whether the vote is pending, lost, or won. A joint quorum
// requires both majority quorums to vote in favor.
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap);
void syncRaftInitQuorumJointConfig(SSyncRaftQuorumJointConfig* config);
static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
return syncRaftIsInNodeMap(&config->outgoing, id); return syncRaftIsInNodeMap(&config->outgoing, id);
...@@ -59,7 +66,19 @@ static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigOutgoing(const SS ...@@ -59,7 +66,19 @@ static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigOutgoing(const SS
} }
static FORCE_INLINE void syncRaftJointConfigClearOutgoing(SSyncRaftQuorumJointConfig* config) { static FORCE_INLINE void syncRaftJointConfigClearOutgoing(SSyncRaftQuorumJointConfig* config) {
memset(&config->outgoing, 0, sizeof(SSyncCluster)); syncRaftClearNodeMap(&config->outgoing);
}
static FORCE_INLINE bool syncRaftJointConfigIsIncomingEmpty(const SSyncRaftQuorumJointConfig* config) {
return syncRaftNodeMapSize(&config->incoming) == 0;
}
static FORCE_INLINE bool syncRaftJointConfigIsOutgoingEmpty(const SSyncRaftQuorumJointConfig* config) {
return syncRaftNodeMapSize(&config->outgoing) == 0;
}
static FORCE_INLINE bool syncRaftJointConfigIsInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
return syncRaftIsInNodeMap(&config->outgoing, id);
} }
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "sync.h" #include "sync.h"
#include "sync_type.h" #include "sync_type.h"
#include "sync_raft_quorum.h" #include "sync_raft_quorum.h"
#include "thash.h"
/** /**
* syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
...@@ -26,6 +27,10 @@ ...@@ -26,6 +27,10 @@
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a * yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* quorum of no has been reached). * quorum of no has been reached).
**/ **/
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes); ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap);
// CommittedIndex computes the committed index from those supplied via the
// provided AckedIndexer (for the active config).
SyncIndex syncRaftMajorityConfigCommittedIndex(const SSyncRaftNodeMap* config, matchAckIndexerFp indexer, void* arg);
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
// the Changer only needs a ProgressMap (not a whole Tracker) at which point // the Changer only needs a ProgressMap (not a whole Tracker) at which point
// this can just take LastIndex and MaxInflight directly instead and cook up // this can just take LastIndex and MaxInflight directly instead and cook up
// the results from that alone. // the results from that alone.
int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs); int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
#endif /* TD_SYNC_RAFT_RESTORE_H */ #endif /* TD_SYNC_RAFT_RESTORE_H */
...@@ -32,6 +32,8 @@ typedef struct SSyncRaftProgress SSyncRaftProgress; ...@@ -32,6 +32,8 @@ typedef struct SSyncRaftProgress SSyncRaftProgress;
typedef struct SSyncRaftProgressMap SSyncRaftProgressMap; typedef struct SSyncRaftProgressMap SSyncRaftProgressMap;
typedef struct SSyncRaftProgressTrackerConfig SSyncRaftProgressTrackerConfig; typedef struct SSyncRaftProgressTrackerConfig SSyncRaftProgressTrackerConfig;
typedef struct SSyncRaftNodeMap SSyncRaftNodeMap;
typedef struct SSyncRaftProgressTracker SSyncRaftProgressTracker; typedef struct SSyncRaftProgressTracker SSyncRaftProgressTracker;
typedef struct SSyncRaftChanger SSyncRaftChanger; typedef struct SSyncRaftChanger SSyncRaftChanger;
...@@ -68,11 +70,6 @@ typedef struct SSyncClusterConfig { ...@@ -68,11 +70,6 @@ typedef struct SSyncClusterConfig {
const SSyncCluster* cluster; const SSyncCluster* cluster;
} SSyncClusterConfig; } SSyncClusterConfig;
typedef struct {
int32_t replica;
SyncNodeId nodeId[TSDB_MAX_REPLICA];
} SSyncRaftNodeMap;
typedef enum { typedef enum {
SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0,
SYNC_RAFT_CAMPAIGN_ELECTION = 1, SYNC_RAFT_CAMPAIGN_ELECTION = 1,
...@@ -80,9 +77,6 @@ typedef enum { ...@@ -80,9 +77,6 @@ typedef enum {
} ESyncRaftElectionType; } ESyncRaftElectionType;
typedef enum { typedef enum {
// the init vote resp status
SYNC_RAFT_VOTE_RESP_UNKNOWN = 0,
// grant the vote request // grant the vote request
SYNC_RAFT_VOTE_RESP_GRANT = 1, SYNC_RAFT_VOTE_RESP_GRANT = 1,
...@@ -90,4 +84,8 @@ typedef enum { ...@@ -90,4 +84,8 @@ typedef enum {
SYNC_RAFT_VOTE_RESP_REJECT = 2, SYNC_RAFT_VOTE_RESP_REJECT = 2,
} ESyncRaftVoteType; } ESyncRaftVoteType;
typedef void (*visitProgressFp)(SSyncRaftProgress* progress, void* arg);
typedef void (*matchAckIndexerFp)(SyncNodeId id, void* arg, SyncIndex* index);
#endif /* _TD_LIBS_SYNC_TYPE_H */ #endif /* _TD_LIBS_SYNC_TYPE_H */
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
*/ */
#include "raft.h" #include "raft.h"
#include "raft_configuration.h" #include "sync_raft_impl.h"
#include "raft_log.h" #include "raft_log.h"
#include "sync_raft_restore.h" #include "sync_raft_restore.h"
#include "raft_replication.h" #include "raft_replication.h"
...@@ -59,8 +59,13 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -59,8 +59,13 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
logStore = &(pRaft->logStore); logStore = &(pRaft->logStore);
fsm = &(pRaft->fsm); fsm = &(pRaft->fsm);
pRaft->nodeInfoMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (pRaft->nodeInfoMap == NULL) {
return -1;
}
// init progress tracker // init progress tracker
pRaft->tracker = syncRaftOpenProgressTracker(); pRaft->tracker = syncRaftOpenProgressTracker(pRaft);
if (pRaft->tracker == NULL) { if (pRaft->tracker == NULL) {
return -1; return -1;
} }
...@@ -96,11 +101,22 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -96,11 +101,22 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
.tracker = pRaft->tracker, .tracker = pRaft->tracker,
.lastIndex = syncRaftLogLastIndex(pRaft->log), .lastIndex = syncRaftLogLastIndex(pRaft->log),
}; };
if (syncRaftRestoreConfig(&changer, &confState) < 0) { SSyncRaftProgressTrackerConfig config;
SSyncRaftProgressMap progressMap;
if (syncRaftRestoreConfig(&changer, &confState, &config, &progressMap) < 0) {
syncError("syncRaftRestoreConfig for vgid %d fail", pInfo->vgId); syncError("syncRaftRestoreConfig for vgid %d fail", pInfo->vgId);
return -1; return -1;
} }
// save restored config and progress map to tracker
syncRaftCopyProgressMap(&progressMap, &pRaft->tracker->progressMap);
syncRaftCopyTrackerConfig(&config, &pRaft->tracker->config);
// free progress map and config
syncRaftFreeProgressMap(&progressMap);
syncRaftFreeTrackConfig(&config);
if (!syncRaftIsEmptyServerState(&serverState)) { if (!syncRaftIsEmptyServerState(&serverState)) {
syncRaftLoadState(pRaft, &serverState); syncRaftLoadState(pRaft, &serverState);
} }
...@@ -140,6 +156,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { ...@@ -140,6 +156,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
int32_t syncRaftTick(SSyncRaft* pRaft) { int32_t syncRaftTick(SSyncRaft* pRaft) {
pRaft->currentTick += 1; pRaft->currentTick += 1;
pRaft->tickFp(pRaft);
return 0; return 0;
} }
...@@ -151,8 +168,8 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch ...@@ -151,8 +168,8 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch
return 0; return 0;
} }
static void visitProgressMaybeSendAppend(int i, SSyncRaftProgress* progress, void* arg) { static void visitProgressMaybeSendAppend(SSyncRaftProgress* progress, void* arg) {
syncRaftReplicate(arg, progress, false); syncRaftMaybeSendAppend(arg, progress, false);
} }
// switchToConfig reconfigures this node to use the provided configuration. It // switchToConfig reconfigures this node to use the provided configuration. It
...@@ -169,13 +186,12 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi ...@@ -169,13 +186,12 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi
SSyncRaftProgress* progress = NULL; SSyncRaftProgress* progress = NULL;
syncRaftConfigState(pRaft->tracker, cs); syncRaftConfigState(pRaft->tracker, cs);
i = syncRaftFindProgressIndexByNodeId(&pRaft->tracker->progressMap, selfId); progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, selfId);
exist = (i != -1); exist = (progress != NULL);
// Update whether the node itself is a learner, resetting to false when the // Update whether the node itself is a learner, resetting to false when the
// node is removed. // node is removed.
if (exist) { if (exist) {
progress = &pRaft->tracker->progressMap.progress[i];
pRaft->isLearner = progress->isLearner; pRaft->isLearner = progress->isLearner;
} else { } else {
pRaft->isLearner = false; pRaft->isLearner = false;
...@@ -196,7 +212,7 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi ...@@ -196,7 +212,7 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi
// The remaining steps only make sense if this node is the leader and there // The remaining steps only make sense if this node is the leader and there
// are other nodes. // are other nodes.
if (pRaft->state != TAOS_SYNC_STATE_LEADER || cs->voters.replica == 0) { if (pRaft->state != TAOS_SYNC_STATE_LEADER || syncRaftNodeMapSize(&cs->voters) == 0) {
return; return;
} }
...@@ -212,8 +228,11 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi ...@@ -212,8 +228,11 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi
// If the the leadTransferee was removed or demoted, abort the leadership transfer. // If the the leadTransferee was removed or demoted, abort the leadership transfer.
SyncNodeId leadTransferee = pRaft->leadTransferee; SyncNodeId leadTransferee = pRaft->leadTransferee;
if (leadTransferee != SYNC_NON_NODE_ID && !syncRaftIsInNodeMap(&pRaft->tracker->config.voters, leadTransferee)) { if (leadTransferee != SYNC_NON_NODE_ID) {
abortLeaderTransfer(pRaft); if (!syncRaftIsInNodeMap(&pRaft->tracker->config.voters.incoming, leadTransferee) &&
!syncRaftIsInNodeMap(&pRaft->tracker->config.voters.outgoing, leadTransferee)) {
abortLeaderTransfer(pRaft);
}
} }
} }
} }
...@@ -286,8 +305,8 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) ...@@ -286,8 +305,8 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
* but it will not receive MsgApp or MsgHeartbeat, so it will not create * but it will not receive MsgApp or MsgHeartbeat, so it will not create
* disruptive term increases * disruptive term increases
**/ **/
int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from);
if (peerIndex < 0) { if (pNode == NULL) {
return true; return true;
} }
SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term); SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term);
...@@ -295,7 +314,7 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) ...@@ -295,7 +314,7 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
return true; return true;
} }
pRaft->io.send(msg, &(pRaft->cluster.nodeInfo[peerIndex])); pRaft->io.send(msg, pNode);
} else { } else {
// ignore other cases // ignore other cases
syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]", syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]",
......
...@@ -16,15 +16,14 @@ ...@@ -16,15 +16,14 @@
#include "syncInt.h" #include "syncInt.h"
#include "raft.h" #include "raft.h"
#include "raft_log.h" #include "raft_log.h"
#include "raft_configuration.h" #include "sync_raft_impl.h"
#include "raft_message.h" #include "raft_message.h"
int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
const RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries); const RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries);
int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from);
if (pNode == NULL) {
if (peerIndex < 0) {
return 0; return 0;
} }
...@@ -44,6 +43,6 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs ...@@ -44,6 +43,6 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs
pRaft->selfGroupId, pRaft->selfId, pMsg->from, appendEntries->index); pRaft->selfGroupId, pRaft->selfId, pMsg->from, appendEntries->index);
out: out:
pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[peerIndex])); pRaft->io.send(pRespMsg, pNode);
return 0; return 0;
} }
\ No newline at end of file
...@@ -19,24 +19,6 @@ ...@@ -19,24 +19,6 @@
#include "raft_message.h" #include "raft_message.h"
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (pRaft->state == TAOS_SYNC_STATE_LEADER) {
syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId);
return 0;
}
if (!syncRaftIsPromotable(pRaft)) {
syncDebug("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId);
return 0;
}
// if there is pending uncommitted config,cannot start election
if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) {
syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply",
pRaft->selfGroupId, pRaft->selfId, pRaft->term);
return 0;
}
syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
if (pRaft->preVote) { if (pRaft->preVote) {
syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_PRE_ELECTION); syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_PRE_ELECTION);
} else { } else {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncInt.h" #include "syncInt.h"
#include "raft.h" #include "raft.h"
#include "raft_configuration.h" #include "sync_raft_impl.h"
#include "raft_log.h" #include "raft_log.h"
#include "raft_message.h" #include "raft_message.h"
...@@ -23,10 +23,11 @@ static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); ...@@ -23,10 +23,11 @@ static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
SSyncMessage* pRespMsg; SSyncMessage* pRespMsg;
int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from);
if (voteIndex == -1) { if (pNode == NULL) {
return 0; return 0;
} }
bool grant; bool grant;
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log);
...@@ -42,17 +43,19 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { ...@@ -42,17 +43,19 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
grant ? "grant" : "reject", grant ? "grant" : "reject",
pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term); pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term);
pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[voteIndex])); pRaft->io.send(pRespMsg, pNode);
return 0; return 0;
} }
static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (!(pRaft->voteFor == SYNC_NON_NODE_ID || pMsg->term > pRaft->term || pRaft->voteFor == pMsg->from)) { bool canVote =
return false; // We can vote if this is a repeat of a vote we've already cast...
} pRaft->voteFor == pMsg->from ||
if (!syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm)) { // ...we haven't voted and we don't think there's a leader yet in this term...
return false; (pRaft->voteFor == SYNC_NON_NODE_ID && pRaft->leaderId == SYNC_NON_NODE_ID) ||
} // ...or this is a PreVote for a future term...
(pMsg->vote.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION && pMsg->term > pRaft->term);
return true; // ...and we believe the candidate is up to date.
return canVote && syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm);
} }
\ No newline at end of file
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncInt.h" #include "syncInt.h"
#include "raft.h" #include "raft.h"
#include "raft_configuration.h" #include "sync_raft_impl.h"
#include "raft_message.h" #include "raft_message.h"
int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
...@@ -25,8 +25,8 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { ...@@ -25,8 +25,8 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
assert(pRaft->state == TAOS_SYNC_STATE_CANDIDATE); assert(pRaft->state == TAOS_SYNC_STATE_CANDIDATE);
voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from);
if (voterIndex == -1) { if (pNode == NULL) {
syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from); syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from);
return 0; return 0;
} }
...@@ -45,12 +45,14 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { ...@@ -45,12 +45,14 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (result == SYNC_RAFT_VOTE_WON) { if (result == SYNC_RAFT_VOTE_WON) {
if (pRaft->candidateState.inPreVote) { if (pRaft->candidateState.inPreVote) {
syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); syncRaftCampaign(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION);
} else { } else {
syncRaftBecomeLeader(pRaft); syncRaftBecomeLeader(pRaft);
syncRaftBroadcastAppend(pRaft);
} }
} else if (result == SYNC_RAFT_VOTE_LOST) { } else if (result == SYNC_RAFT_VOTE_LOST) {
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
} }
......
...@@ -22,14 +22,14 @@ ...@@ -22,14 +22,14 @@
static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress); static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress);
static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress,
SyncIndex prevIndex, SyncTerm prevTerm, SyncIndex prevIndex, SyncTerm prevTerm,
const SSyncRaftEntry *entries, int nEntry); SSyncRaftEntry *entries, int nEntry);
// syncRaftReplicate sends an append RPC with new entries to the given peer, // maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty // if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent // argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but // ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch). // are undesirable when we're sending multiple messages in a batch).
bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) { bool syncRaftMaybeSendAppend(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) {
assert(pRaft->state == TAOS_SYNC_STATE_LEADER); assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
SyncNodeId nodeId = progress->id; SyncNodeId nodeId = progress->id;
...@@ -68,10 +68,13 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) { ...@@ -68,10 +68,13 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress,
SyncIndex prevIndex, SyncTerm prevTerm, SyncIndex prevIndex, SyncTerm prevTerm,
const SSyncRaftEntry *entries, int nEntry) { SSyncRaftEntry *entries, int nEntry) {
SNodeInfo* pNode = syncRaftGetNodeById(pRaft, progress->id);
if (pNode == NULL) {
return false;
}
SyncIndex lastIndex; SyncIndex lastIndex;
SyncTerm logTerm = prevTerm; SyncTerm logTerm = prevTerm;
SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[progress->selfIndex]);
SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term, SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term,
prevIndex, prevTerm, pRaft->log->commitIndex, prevIndex, prevTerm, pRaft->log->commitIndex,
...@@ -87,7 +90,7 @@ static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, ...@@ -87,7 +90,7 @@ static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress,
case PROGRESS_STATE_REPLICATE: case PROGRESS_STATE_REPLICATE:
lastIndex = entries[nEntry - 1].index; lastIndex = entries[nEntry - 1].index;
syncRaftProgressOptimisticNextIndex(progress, lastIndex); syncRaftProgressOptimisticNextIndex(progress, lastIndex);
syncRaftInflightAdd(&progress->inflights, lastIndex); syncRaftInflightAdd(progress->inflights, lastIndex);
break; break;
case PROGRESS_STATE_PROBE: case PROGRESS_STATE_PROBE:
progress->probeSent = true; progress->probeSent = true;
......
...@@ -99,7 +99,7 @@ void syncCleanUp() { ...@@ -99,7 +99,7 @@ void syncCleanUp() {
SSyncNode* syncStart(const SSyncInfo* pInfo) { SSyncNode* syncStart(const SSyncInfo* pInfo) {
pthread_mutex_lock(&gSyncManager->mutex); pthread_mutex_lock(&gSyncManager->mutex);
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId)); SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId*));
if (ppNode != NULL) { if (ppNode != NULL) {
syncInfo("vgroup %d already exist", pInfo->vgId); syncInfo("vgroup %d already exist", pInfo->vgId);
pthread_mutex_unlock(&gSyncManager->mutex); pthread_mutex_unlock(&gSyncManager->mutex);
...@@ -140,7 +140,7 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) { ...@@ -140,7 +140,7 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) {
void syncStop(const SSyncNode* pNode) { void syncStop(const SSyncNode* pNode) {
pthread_mutex_lock(&gSyncManager->mutex); pthread_mutex_lock(&gSyncManager->mutex);
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId)); SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId*));
if (ppNode == NULL) { if (ppNode == NULL) {
syncInfo("vgroup %d not exist", pNode->vgId); syncInfo("vgroup %d not exist", pNode->vgId);
pthread_mutex_unlock(&gSyncManager->mutex); pthread_mutex_unlock(&gSyncManager->mutex);
...@@ -288,7 +288,7 @@ static void *syncWorkerMain(void *argv) { ...@@ -288,7 +288,7 @@ static void *syncWorkerMain(void *argv) {
static void syncNodeTick(void *param, void *tmrId) { static void syncNodeTick(void *param, void *tmrId) {
SyncGroupId vgId = (SyncGroupId)param; SyncGroupId vgId = (SyncGroupId)param;
SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId)); SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId*));
if (ppNode == NULL) { if (ppNode == NULL) {
return; return;
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "raft.h"
#include "syncInt.h" #include "syncInt.h"
#include "sync_raft_config_change.h" #include "sync_raft_config_change.h"
#include "sync_raft_progress.h" #include "sync_raft_progress.h"
...@@ -40,40 +41,7 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* ...@@ -40,40 +41,7 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig*
static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id); SSyncRaftProgressMap* progressMap, SyncNodeId id);
static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id); SSyncRaftProgressMap* progressMap, SyncNodeId id);
// syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate)
// mutates the incoming majority config Voters[0] by at most one. This method
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
// outgoing configuration).
int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
int ret;
ret = checkAndCopy(changer, config, progressMap);
if (ret != 0) {
return ret;
}
if (hasJointConfig(config)) {
syncError("can't apply simple config change in joint config");
return -1;
}
ret = applyConfig(changer, config, progressMap, css);
if (ret != 0) {
return ret;
}
int n = symDiff(syncRaftJointConfigIncoming(&changer->tracker->config.voters),
syncRaftJointConfigIncoming(&config->voters));
if (n > 1) {
syncError("more than one voter changed without entering joint config");
return -1;
}
return checkAndReturn(config, progressMap);
}
// EnterJoint verifies that the outgoing (=right) majority config of the joint // EnterJoint verifies that the outgoing (=right) majority config of the joint
// config is empty and initializes it with a copy of the incoming (=left) // config is empty and initializes it with a copy of the incoming (=left)
...@@ -96,12 +64,13 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S ...@@ -96,12 +64,13 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S
if (ret != 0) { if (ret != 0) {
return ret; return ret;
} }
if (hasJointConfig(config)) { if (hasJointConfig(config)) {
syncError("config is already joint"); syncError("config is already joint");
return -1; return -1;
} }
if(config->voters.incoming.replica == 0) { if(syncRaftJointConfigIsIncomingEmpty(&config->voters) == 0) {
// We allow adding nodes to an empty config for convenience (testing and // We allow adding nodes to an empty config for convenience (testing and
// bootstrap), but you can't enter a joint state. // bootstrap), but you can't enter a joint state.
syncError("can't make a zero-voter config joint"); syncError("can't make a zero-voter config joint");
...@@ -112,7 +81,7 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S ...@@ -112,7 +81,7 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S
syncRaftJointConfigClearOutgoing(&config->voters); syncRaftJointConfigClearOutgoing(&config->voters);
// Copy incoming to outgoing. // Copy incoming to outgoing.
memcpy(&config->voters.outgoing, &config->voters.incoming, sizeof(SSyncCluster)); syncRaftCopyNodeMap(&config->voters.incoming, &config->voters.outgoing);
ret = applyConfig(changer, config, progressMap, css); ret = applyConfig(changer, config, progressMap, css);
if (ret != 0) { if (ret != 0) {
...@@ -123,84 +92,43 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S ...@@ -123,84 +92,43 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const S
return checkAndReturn(config, progressMap); return checkAndReturn(config, progressMap);
} }
// checkAndCopy copies the tracker's config and progress map (deeply enough for // Simple carries out a series of configuration changes that (in aggregate)
// the purposes of the Changer) and returns those copies. It returns an error // mutates the incoming majority config Voters[0] by at most one. This method
// if checkInvariants does. // will return an error if that is not the case, if the resulting quorum is
static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { // zero, or if the configuration is in a joint state (i.e. if there is an
syncRaftCloneTrackerConfig(&changer->tracker->config, config); // outgoing configuration).
int i; int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css,
for (i = 0; i < TSDB_MAX_REPLICA; ++i) { SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
SSyncRaftProgress* progress = &(changer->tracker->progressMap.progress[i]); int ret;
if (progress->id == SYNC_NON_NODE_ID) {
continue; ret = checkAndCopy(changer, config, progressMap);
} if (ret != 0) {
syncRaftCopyProgress(progress, &(progressMap->progress[i])); return ret;
} }
return checkAndReturn(config, progressMap);
}
// checkAndReturn calls checkInvariants on the input and returns either the if (hasJointConfig(config)) {
// resulting error or the input. syncError("can't apply simple config change in joint config");
static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
if (checkInvariants(config, progressMap) != 0) {
return -1; return -1;
} }
return 0; ret = applyConfig(changer, config, progressMap, css);
}
// checkInvariants makes sure that the config and progress are compatible with
// each other. This is used to check both what the Changer is initialized with,
// as well as what it returns.
static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
int ret = syncRaftCheckProgress(config, progressMap);
if (ret != 0) { if (ret != 0) {
return ret; return ret;
} }
int i; int n = symDiff(syncRaftJointConfigIncoming(&changer->tracker->config.voters),
// Any staged learner was staged because it could not be directly added due syncRaftJointConfigIncoming(&config->voters));
// to a conflicting voter in the outgoing config. if (n > 1) {
for (i = 0; i < TSDB_MAX_REPLICA; ++i) { syncError("more than one voter changed without entering joint config");
if (!syncRaftJointConfigInOutgoing(&config->voters, config->learnersNext.nodeId[i])) { return -1;
return -1;
}
if (progressMap->progress[i].id != SYNC_NON_NODE_ID && progressMap->progress[i].isLearner) {
syncError("%d is in LearnersNext, but is already marked as learner", progressMap->progress[i].id);
return -1;
}
}
// Conversely Learners and Voters doesn't intersect at all.
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (syncRaftJointConfigInIncoming(&config->voters, config->learners.nodeId[i])) {
syncError("%d is in Learners and voter.incoming", progressMap->progress[i].id);
return -1;
}
if (progressMap->progress[i].id != SYNC_NON_NODE_ID && !progressMap->progress[i].isLearner) {
syncError("%d is in Learners, but is not marked as learner", progressMap->progress[i].id);
return -1;
}
}
if (!hasJointConfig(config)) {
// We enforce that empty maps are nil instead of zero.
if (config->learnersNext.replica > 0) {
syncError("cfg.LearnersNext must be nil when not joint");
return -1;
}
if (config->autoLeave) {
syncError("AutoLeave must be false when not joint");
return -1;
}
} }
return 0; return checkAndReturn(config, progressMap);
}
static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) {
return config->voters.outgoing.replica > 0;
} }
// apply a change to the configuration. By convention, changes to voters are
// always made to the incoming majority config Voters[0]. Voters[1] is either
// empty or preserves the outgoing majority configuration while in a joint state.
static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css) { SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css) {
int i; int i;
...@@ -227,7 +155,7 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig ...@@ -227,7 +155,7 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig
} }
} }
if (config->voters.incoming.replica == 0) { if (syncRaftJointConfigIsIncomingEmpty(&config->voters)) {
syncError("removed all voters"); syncError("removed all voters");
return -1; return -1;
} }
...@@ -235,86 +163,16 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig ...@@ -235,86 +163,16 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig
return 0; return 0;
} }
// symdiff returns the count of the symmetric difference between the sets of
// uint64s, i.e. len( (l - r) \union (r - l)).
static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r) {
int n;
int i;
int j0, j1;
const SSyncRaftNodeMap* pairs[2][2] = {
{l, r}, // count elems in l but not in r
{r, l}, // count elems in r but not in l
};
for (n = 0, i = 0; i < 2; ++i) {
const SSyncRaftNodeMap** pp = pairs[i];
const SSyncRaftNodeMap* p0 = pp[0];
const SSyncRaftNodeMap* p1 = pp[1];
for (j0 = 0; j0 < TSDB_MAX_REPLICA; ++j0) {
SyncNodeId id = p0->nodeId[j0];
if (id == SYNC_NON_NODE_ID) {
continue;
}
for (j1 = 0; j1 < p1->replica; ++j1) {
if (p1->nodeId[j1] != SYNC_NON_NODE_ID && p1->nodeId[j1] != id) {
n+=1;
}
}
}
}
return n;
}
static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner) {
}
// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after.
static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id) {
int i;
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (nodeMap->nodeId[i] == id) {
nodeMap->replica -= 1;
nodeMap->nodeId[i] = SYNC_NON_NODE_ID;
break;
}
}
assert(nodeMap->replica >= 0);
}
// nilAwareAdd populates a map entry, creating the map if necessary.
static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id) {
int i, j;
for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) {
if (nodeMap->nodeId[i] == id) {
return;
}
if (j == -1 && nodeMap->nodeId[i] == SYNC_NON_NODE_ID) {
j = i;
}
}
assert(j != -1);
nodeMap->nodeId[j] = id;
nodeMap->replica += 1;
}
// makeVoter adds or promotes the given ID to be a voter in the incoming // makeVoter adds or promotes the given ID to be a voter in the incoming
// majority config. // majority config.
static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id) { SSyncRaftProgressMap* progressMap, SyncNodeId id) {
int i = syncRaftFindProgressIndexByNodeId(progressMap, id); SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id);
if (i == -1) { if (progress == NULL) {
initProgress(changer, config, progressMap, id, false); initProgress(changer, config, progressMap, id, false);
i = syncRaftFindProgressIndexByNodeId(progressMap, id); return;
} }
assert(i != -1);
SSyncRaftProgress* progress = &(progressMap->progress[i]);
progress->isLearner = false; progress->isLearner = false;
nilAwareDelete(&config->learners, id); nilAwareDelete(&config->learners, id);
...@@ -337,14 +195,12 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* ...@@ -337,14 +195,12 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig*
// LeaveJoint(). // LeaveJoint().
static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id) { SSyncRaftProgressMap* progressMap, SyncNodeId id) {
int i = syncRaftFindProgressIndexByNodeId(progressMap, id); SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id);
if (i == -1) { if (progress == NULL) {
initProgress(changer, config, progressMap, id, false); initProgress(changer, config, progressMap, id, true);
i = syncRaftFindProgressIndexByNodeId(progressMap, id); return;
} }
assert(i != -1);
SSyncRaftProgress* progress = &(progressMap->progress[i]);
if (progress->isLearner) { if (progress->isLearner) {
return; return;
} }
...@@ -352,15 +208,15 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi ...@@ -352,15 +208,15 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi
removeNodeId(changer, config, progressMap, id); removeNodeId(changer, config, progressMap, id);
// ... but save the Progress. // ... but save the Progress.
syncRaftAddToProgressMap(progressMap, id); syncRaftAddToProgressMap(progressMap, progress);
// Use LearnersNext if we can't add the learner to Learners directly, i.e. // Use LearnersNext if we can't add the learner to Learners directly, i.e.
// if the peer is still tracked as a voter in the outgoing config. It will // if the peer is still tracked as a voter in the outgoing config. It will
// be turned into a learner in LeaveJoint(). // be turned into a learner in LeaveJoint().
// //
// Otherwise, add a regular learner right away. // Otherwise, add a regular learner right away.
bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); bool inInOutgoing = syncRaftJointConfigIsInOutgoing(&config->voters, id);
if (inOutgoing) { if (inInOutgoing) {
nilAwareAdd(&config->learnersNext, id); nilAwareAdd(&config->learnersNext, id);
} else { } else {
nilAwareAdd(&config->learners, id); nilAwareAdd(&config->learners, id);
...@@ -371,8 +227,8 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi ...@@ -371,8 +227,8 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi
// removeNodeId this peer as a voter or learner from the incoming config. // removeNodeId this peer as a voter or learner from the incoming config.
static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id) { SSyncRaftProgressMap* progressMap, SyncNodeId id) {
int i = syncRaftFindProgressIndexByNodeId(progressMap, id); SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id);
if (i == -1) { if (progress == NULL) {
return; return;
} }
...@@ -381,8 +237,173 @@ static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConf ...@@ -381,8 +237,173 @@ static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConf
nilAwareDelete(&config->learnersNext, id); nilAwareDelete(&config->learnersNext, id);
// If the peer is still a voter in the outgoing config, keep the Progress. // If the peer is still a voter in the outgoing config, keep the Progress.
bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); bool inInOutgoing = syncRaftJointConfigIsInOutgoing(&config->voters, id);
if (!inOutgoing) { if (!inInOutgoing) {
syncRaftRemoveFromProgressMap(progressMap, id); syncRaftRemoveFromProgressMap(progressMap, id);
} }
}
// initProgress initializes a new progress for the given node or learner.
static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner) {
if (!isLearner) {
syncRaftJointConfigAddToIncoming(&config->voters, id);
} else {
nilAwareAdd(&config->learners, id);
}
SSyncRaftProgress* pProgress = (SSyncRaftProgress*)malloc(sizeof(SSyncRaftProgress));
assert (pProgress != NULL);
*pProgress = (SSyncRaftProgress) {
// Initializing the Progress with the last index means that the follower
// can be probed (with the last index).
//
// TODO(tbg): seems awfully optimistic. Using the first index would be
// better. The general expectation here is that the follower has no log
// at all (and will thus likely need a snapshot), though the app may
// have applied a snapshot out of band before adding the replica (thus
// making the first index the better choice).
.id = id,
.groupId = changer->tracker->pRaft->selfGroupId,
.nextIndex = changer->lastIndex,
.matchIndex = 0,
.state = PROGRESS_STATE_PROBE,
.pendingSnapshotIndex = 0,
.probeSent = false,
.inflights = syncRaftOpenInflights(changer->tracker->maxInflightMsgs),
.isLearner = isLearner,
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has had a chance to communicate with us.
.recentActive = true,
.refCount = 0,
};
syncRaftAddToProgressMap(progressMap, pProgress);
}
// checkInvariants makes sure that the config and progress are compatible with
// each other. This is used to check both what the Changer is initialized with,
// as well as what it returns.
static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
int ret = syncRaftCheckTrackerConfigInProgress(config, progressMap);
if (ret != 0) {
return ret;
}
// Any staged learner was staged because it could not be directly added due
// to a conflicting voter in the outgoing config.
SyncNodeId* pNodeId = NULL;
while (!syncRaftIterateNodeMap(&config->learnersNext, pNodeId)) {
SyncNodeId nodeId = *pNodeId;
if (!syncRaftJointConfigInOutgoing(&config->voters, nodeId)) {
syncError("[%d] is in LearnersNext, but not outgoing", nodeId);
return -1;
}
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, nodeId);
assert(progress);
assert(progress->id == nodeId);
if (progress->isLearner) {
syncError("[%d:%d] is in LearnersNext, but is already marked as learner", progress->groupId, nodeId);
return -1;
}
}
// Conversely Learners and Voters doesn't intersect at all.
pNodeId = NULL;
while (!syncRaftIterateNodeMap(&config->learners, pNodeId)) {
SyncNodeId nodeId = *pNodeId;
if (syncRaftJointConfigInOutgoing(&config->voters, nodeId)) {
syncError("%d is in Learners and outgoing", nodeId);
return -1;
}
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, nodeId);
assert(progress);
assert(progress->id == nodeId);
if (!progress->isLearner) {
syncError("[%d:%d] is in Learners, but is not marked as learner", progress->groupId, nodeId);
return -1;
}
}
if (!hasJointConfig(config)) {
// We enforce that empty maps are nil instead of zero.
if (syncRaftNodeMapSize(&config->learnersNext) > 0) {
syncError("cfg.LearnersNext must be nil when not joint");
return -1;
}
if (config->autoLeave) {
syncError("AutoLeave must be false when not joint");
return -1;
}
}
return 0;
}
// checkAndCopy copies the tracker's config and progress map (deeply enough for
// the purposes of the Changer) and returns those copies. It returns an error
// if checkInvariants does.
static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
syncRaftCopyTrackerConfig(&changer->tracker->config, config);
syncRaftClearProgressMap(progressMap);
SSyncRaftProgress* pProgress = NULL;
while (!syncRaftIterateProgressMap(&changer->tracker->progressMap, pProgress)) {
syncRaftAddToProgressMap(progressMap, pProgress);
}
return checkAndReturn(config, progressMap);
}
// checkAndReturn calls checkInvariants on the input and returns either the
// resulting error or the input.
static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
if (checkInvariants(config, progressMap) != 0) {
return -1;
}
return 0;
}
static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) {
return !syncRaftJointConfigIsOutgoingEmpty(&config->voters);
}
// symdiff returns the count of the symmetric difference between the sets of
// uint64s, i.e. len( (l - r) \union (r - l)).
static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r) {
int n;
int i;
int j0, j1;
const SSyncRaftNodeMap* pairs[2][2] = {
{l, r}, // count elems in l but not in r
{r, l}, // count elems in r but not in l
};
for (n = 0, i = 0; i < 2; ++i) {
const SSyncRaftNodeMap** pp = pairs[i];
const SSyncRaftNodeMap* p0 = pp[0];
const SSyncRaftNodeMap* p1 = pp[1];
SyncNodeId* pNodeId;
while (!syncRaftIterateNodeMap(p0, pNodeId)) {
if (!syncRaftIsInNodeMap(p1, *pNodeId)) {
n+=1;
}
}
}
return n;
}
// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after.
static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id) {
syncRaftRemoveFromNodeMap(nodeMap, id);
}
// nilAwareAdd populates a map entry, creating the map if necessary.
static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id) {
syncRaftAddToNodeMap(nodeMap, id);
} }
\ No newline at end of file
...@@ -17,15 +17,40 @@ ...@@ -17,15 +17,40 @@
#include "raft.h" #include "raft.h"
#include "raft_log.h" #include "raft_log.h"
#include "raft_message.h" #include "raft_message.h"
#include "sync_raft_progress_tracker.h"
void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
SyncTerm term; if (pRaft->state == TAOS_SYNC_STATE_LEADER) {
syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId);
return;
}
if (!syncRaftIsPromotable(pRaft)) {
syncWarn("[%d:%d] is unpromotable and can not syncRaftCampaign", pRaft->selfGroupId, pRaft->selfId);
return;
}
// if there is pending uncommitted config,cannot start election
if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) {
syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply",
pRaft->selfGroupId, pRaft->selfId, pRaft->term);
return;
}
syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
syncRaftCampaign(pRaft, cType);
}
// syncRaftCampaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
void syncRaftCampaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
bool preVote; bool preVote;
ESyncRaftMessageType voteMsgType; SyncTerm term;
if (syncRaftIsPromotable(pRaft)) { if (syncRaftIsPromotable(pRaft)) {
syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId); syncDebug("[%d:%d] is unpromotable; syncRaftCampaign() should have been called", pRaft->selfGroupId, pRaft->selfId);
return 0; return;
} }
if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) {
...@@ -35,7 +60,6 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { ...@@ -35,7 +60,6 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
term = pRaft->term + 1; term = pRaft->term + 1;
} else { } else {
syncRaftBecomeCandidate(pRaft); syncRaftBecomeCandidate(pRaft);
voteMsgType = RAFT_MSG_VOTE;
term = pRaft->term; term = pRaft->term;
preVote = false; preVote = false;
} }
...@@ -43,10 +67,8 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { ...@@ -43,10 +67,8 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
int quorum = syncRaftQuorum(pRaft); int quorum = syncRaftQuorum(pRaft);
ESyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL); ESyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL);
if (result == SYNC_RAFT_VOTE_WON) { if (result == SYNC_RAFT_VOTE_WON) {
/** // We won the election after voting for ourselves (which must mean that
* We won the election after voting for ourselves (which must mean that // this is a single-node cluster). Advance to the next state.
* this is a single-node cluster). Advance to the next state.
**/
if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) {
syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION);
} else { } else {
...@@ -59,12 +81,23 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { ...@@ -59,12 +81,23 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
int i; int i;
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log);
for (i = 0; i < pRaft->cluster.replica; ++i) { SSyncRaftNodeMap nodeMap;
if (i == pRaft->cluster.selfIndex) { syncRaftJointConfigIDs(&pRaft->tracker->config.voters, &nodeMap);
SyncNodeId *pNodeId = NULL;
while (!syncRaftIterateNodeMap(&nodeMap, pNodeId)) {
SyncNodeId nodeId = *pNodeId;
if (nodeId == SYNC_NON_NODE_ID) {
continue; continue;
} }
SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; if (nodeId == pRaft->selfId) {
continue;
}
SNodeInfo* pNode = syncRaftGetNodeById(pRaft, nodeId);
if (pNode == NULL) {
continue;
}
SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId, SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId,
term, cType, lastIndex, lastTerm); term, cType, lastIndex, lastTerm);
...@@ -72,10 +105,10 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { ...@@ -72,10 +105,10 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
continue; continue;
} }
syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent %d request to %d at term %" PRId64 "", syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent vote request to %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, lastTerm, pRaft->selfGroupId, pRaft->selfId, lastTerm,
lastIndex, voteMsgType, nodeId, pRaft->term); lastIndex, nodeId, pRaft->term);
pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); pRaft->io.send(pMsg, pNode);
} }
} }
\ No newline at end of file
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
*/ */
#include "raft.h" #include "raft.h"
#include "raft_configuration.h" #include "sync_raft_impl.h"
#include "raft_log.h" #include "raft_log.h"
#include "raft_replication.h" #include "raft_replication.h"
#include "sync_raft_progress_tracker.h" #include "sync_raft_progress_tracker.h"
...@@ -25,6 +25,8 @@ static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg); ...@@ -25,6 +25,8 @@ static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool increaseUncommittedSize(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n);
static int triggerAll(SSyncRaft* pRaft); static int triggerAll(SSyncRaft* pRaft);
static void tickElection(SSyncRaft* pRaft); static void tickElection(SSyncRaft* pRaft);
...@@ -82,13 +84,22 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) { ...@@ -82,13 +84,22 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) {
resetRaft(pRaft, pRaft->term); resetRaft(pRaft, pRaft->term);
pRaft->leaderId = pRaft->leaderId; pRaft->leaderId = pRaft->leaderId;
pRaft->state = TAOS_SYNC_STATE_LEADER; pRaft->state = TAOS_SYNC_STATE_LEADER;
// TODO: check if there is pending config log
int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log);
if (nPendingConf > 1) {
syncFatal("unexpected multiple uncommitted config entry");
}
syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId);
assert(progress != NULL);
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
syncRaftProgressBecomeReplicate(progress);
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
pRaft->pendingConfigIndex = lastIndex;
// after become leader, send a no-op log // after become leader, send a no-op log
SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry)); SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry));
...@@ -103,6 +114,7 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) { ...@@ -103,6 +114,7 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) {
}; };
appendEntries(pRaft, entry, 1); appendEntries(pRaft, entry, 1);
//syncRaftTriggerHeartbeat(pRaft); //syncRaftTriggerHeartbeat(pRaft);
syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
} }
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) { void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) {
...@@ -123,15 +135,16 @@ bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { ...@@ -123,15 +135,16 @@ bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) {
} }
int syncRaftQuorum(SSyncRaft* pRaft) { int syncRaftQuorum(SSyncRaft* pRaft) {
return pRaft->cluster.replica / 2 + 1; return 0;
//return pRaft->cluster.replica / 2 + 1;
} }
ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
bool preVote, bool grant, bool preVote, bool grant,
int* rejected, int *granted) { int* rejected, int *granted) {
int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id); SNodeInfo* pNode = syncRaftGetNodeById(pRaft, id);
if (voterIndex == -1) { if (pNode == NULL) {
return SYNC_RAFT_VOTE_PENDING; return true;
} }
if (grant) { if (grant) {
...@@ -142,7 +155,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, ...@@ -142,7 +155,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
} }
syncRaftRecordVote(pRaft->tracker, voterIndex, grant); syncRaftRecordVote(pRaft->tracker, pNode->nodeId, grant);
return syncRaftTallyVotes(pRaft->tracker, rejected, granted); return syncRaftTallyVotes(pRaft->tracker, rejected, granted);
} }
/* /*
...@@ -154,7 +167,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, ...@@ -154,7 +167,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
pRaft->selfGroupId, pRaft->selfId, id, pRaft->term); pRaft->selfGroupId, pRaft->selfId, id, pRaft->term);
} }
int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id); int voteIndex = syncRaftGetNodeById(pRaft, id);
assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0); assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0);
assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN); assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN);
...@@ -185,19 +198,30 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) { ...@@ -185,19 +198,30 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) {
pRaft->voteFor = serverState->voteFor; pRaft->voteFor = serverState->voteFor;
} }
static void visitProgressSendAppend(int i, SSyncRaftProgress* progress, void* arg) { static void visitProgressSendAppend(SSyncRaftProgress* progress, void* arg) {
SSyncRaft* pRaft = (SSyncRaft*)arg; SSyncRaft* pRaft = (SSyncRaft*)arg;
if (pRaft->selfId == progress->id) { if (pRaft->selfId == progress->id) {
return; return;
} }
syncRaftReplicate(arg, progress, true); syncRaftMaybeSendAppend(arg, progress, true);
} }
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
void syncRaftBroadcastAppend(SSyncRaft* pRaft) { void syncRaftBroadcastAppend(SSyncRaft* pRaft) {
syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft); syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft);
} }
SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id) {
SNodeInfo **ppNode = taosHashGet(pRaft->nodeInfoMap, &id, sizeof(SyncNodeId*));
if (ppNode != NULL) {
return *ppNode;
}
return NULL;
}
static int convertClear(SSyncRaft* pRaft) { static int convertClear(SSyncRaft* pRaft) {
} }
...@@ -223,7 +247,7 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { ...@@ -223,7 +247,7 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
syncRaftHandleVoteRespMessage(pRaft, pMsg); syncRaftHandleVoteRespMessage(pRaft, pMsg);
return 0; return 0;
} else if (msgType == RAFT_MSG_APPEND) { } else if (msgType == RAFT_MSG_APPEND) {
syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from); syncRaftBecomeFollower(pRaft, pMsg->term, pMsg->from);
syncRaftHandleAppendEntriesMessage(pRaft, pMsg); syncRaftHandleAppendEntriesMessage(pRaft, pMsg);
} }
return 0; return 0;
...@@ -234,9 +258,7 @@ static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) { ...@@ -234,9 +258,7 @@ static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return 0; return 0;
} }
/** // tickElection is run by followers and candidates after r.electionTimeout.
* tickElection is run by followers and candidates per tick.
**/
static void tickElection(SSyncRaft* pRaft) { static void tickElection(SSyncRaft* pRaft) {
pRaft->electionElapsed += 1; pRaft->electionElapsed += 1;
...@@ -254,10 +276,16 @@ static void tickElection(SSyncRaft* pRaft) { ...@@ -254,10 +276,16 @@ static void tickElection(SSyncRaft* pRaft) {
syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId)); syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId));
} }
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
static void tickHeartbeat(SSyncRaft* pRaft) { static void tickHeartbeat(SSyncRaft* pRaft) {
} }
// TODO
static bool increaseUncommittedSize(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
return false;
}
static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm term = pRaft->term; SyncTerm term = pRaft->term;
...@@ -268,9 +296,16 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { ...@@ -268,9 +296,16 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
entries[i].index = lastIndex + 1 + i; entries[i].index = lastIndex + 1 + i;
} }
// Track the size of this uncommitted proposal.
if (!increaseUncommittedSize(pRaft, entries, n)) {
// Drop the proposal.
return;
}
syncRaftLogAppend(pRaft->log, entries, n); syncRaftLogAppend(pRaft->log, entries, n);
SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->cluster.selfIndex]); SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId);
assert(progress != NULL);
syncRaftProgressMaybeUpdate(progress, lastIndex); syncRaftProgressMaybeUpdate(progress, lastIndex);
// Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend. // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend.
syncRaftMaybeCommit(pRaft); syncRaftMaybeCommit(pRaft);
...@@ -297,7 +332,7 @@ static int triggerAll(SSyncRaft* pRaft) { ...@@ -297,7 +332,7 @@ static int triggerAll(SSyncRaft* pRaft) {
continue; continue;
} }
syncRaftReplicate(pRaft, pRaft->tracker->progressMap.progress[i], true); syncRaftMaybeSendAppend(pRaft, pRaft->tracker->progressMap.progress[i], true);
} }
#endif #endif
return 0; return 0;
...@@ -307,8 +342,8 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) { ...@@ -307,8 +342,8 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) {
pRaft->leadTransferee = SYNC_NON_NODE_ID; pRaft->leadTransferee = SYNC_NON_NODE_ID;
} }
static void initProgress(int i, SSyncRaftProgress* progress, void* arg) { static void resetProgress(SSyncRaftProgress* progress, void* arg) {
syncRaftInitProgress(i, (SSyncRaft*)arg, progress); syncRaftResetProgress((SSyncRaft*)arg, progress);
} }
static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
...@@ -327,7 +362,7 @@ static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { ...@@ -327,7 +362,7 @@ static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
abortLeaderTransfer(pRaft); abortLeaderTransfer(pRaft);
syncRaftResetVotes(pRaft->tracker); syncRaftResetVotes(pRaft->tracker);
syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft); syncRaftProgressVisit(pRaft->tracker, resetProgress, pRaft);
pRaft->pendingConfigIndex = 0; pRaft->pendingConfigIndex = 0;
pRaft->uncommittedSize = 0; pRaft->uncommittedSize = 0;
......
...@@ -40,19 +40,16 @@ void syncRaftCloseInflights(SSyncRaftInflights* inflights) { ...@@ -40,19 +40,16 @@ void syncRaftCloseInflights(SSyncRaftInflights* inflights) {
free(inflights); free(inflights);
} }
/** // Add notifies the Inflights that a new message with the given index is being
* syncRaftInflightAdd notifies the Inflights that a new message with the given index is being // dispatched. Full() must be called prior to Add() to verify that there is room
* dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() // for one more message, and consecutive calls to add Add() must provide a
* to verify that there is room for one more message, // monotonic sequence of indexes.
* and consecutive calls to add syncRaftInflightAdd() must provide a
* monotonic sequence of indexes.
**/
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) { void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) {
assert(!syncRaftInflightFull(inflights)); assert(!syncRaftInflightFull(inflights));
int next = inflights->start + inflights->count; int next = inflights->start + inflights->count;
int size = inflights->size; int size = inflights->size;
/* is next wrapped around buffer? */
if (next >= size) { if (next >= size) {
next -= size; next -= size;
} }
...@@ -61,12 +58,10 @@ void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) ...@@ -61,12 +58,10 @@ void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex)
inflights->count++; inflights->count++;
} }
/** // FreeLE frees the inflights smaller or equal to the given `to` flight.
* syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight.
**/
void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) { void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) {
if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) { if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) {
/* out of the left side of the window */ // out of the left side of the window
return; return;
} }
...@@ -95,10 +90,8 @@ void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) { ...@@ -95,10 +90,8 @@ void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) {
} }
} }
/** // FreeFirstOne releases the first inflight. This is a no-op if nothing is
* syncRaftInflightFreeFirstOne releases the first inflight. // inflight.
* This is a no-op if nothing is inflight.
**/
void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) { void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) {
syncRaftInflightFreeLE(inflights, inflights->buffer[inflights->start]); syncRaftInflightFreeLE(inflights, inflights->buffer[inflights->start]);
} }
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync_raft_node_map.h"
#include "sync_type.h"
#include "sync_raft_progress.h"
void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap) {
nodeMap->nodeIdMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
}
void syncRaftFreeNodeMap(SSyncRaftNodeMap* nodeMap) {
taosHashCleanup(nodeMap->nodeIdMap);
}
void syncRaftClearNodeMap(SSyncRaftNodeMap* nodeMap) {
taosHashClear(nodeMap->nodeIdMap);
}
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
SyncNodeId** ppId = (SyncNodeId**)taosHashGet(nodeMap->nodeIdMap, &nodeId, sizeof(SyncNodeId*));
if (ppId == NULL) {
return false;
}
return true;
}
void syncRaftCopyNodeMap(SSyncRaftNodeMap* from, SSyncRaftNodeMap* to) {
SyncNodeId *pId = NULL;
while (!syncRaftIterateNodeMap(from, pId)) {
taosHashPut(to->nodeIdMap, &pId, sizeof(SyncNodeId*), &pId, sizeof(SyncNodeId*));
}
}
bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId) {
SyncNodeId **ppId = taosHashIterate(nodeMap->nodeIdMap, pId);
if (ppId == NULL) {
return true;
}
*pId = *(*ppId);
return false;
}
bool syncRaftIsAllNodeInProgressMap(SSyncRaftNodeMap* nodeMap, SSyncRaftProgressMap* progressMap) {
SyncNodeId *pId = NULL;
while (!syncRaftIterateNodeMap(nodeMap, pId)) {
if (!syncRaftIsInProgressMap(progressMap, *pId)) {
return false;
}
}
return true;
}
void syncRaftUnionNodeMap(SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) {
syncRaftCopyNodeMap(nodeMap, to);
}
void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
taosHashPut(nodeMap->nodeIdMap, &nodeId, sizeof(SyncNodeId*), &nodeId, sizeof(SyncNodeId*));
}
void syncRaftRemoveFromNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
taosHashRemove(nodeMap->nodeIdMap, &nodeId, sizeof(SyncNodeId*));
}
int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap) {
return taosHashGetSize(nodeMap->nodeIdMap);
}
\ No newline at end of file
...@@ -20,18 +20,26 @@ ...@@ -20,18 +20,26 @@
#include "sync.h" #include "sync.h"
#include "syncInt.h" #include "syncInt.h"
static void copyProgress(SSyncRaftProgress* progress, void* arg);
static void refProgress(SSyncRaftProgress* progress);
static void unrefProgress(SSyncRaftProgress* progress, void*);
static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state); static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state);
static void probeAcked(SSyncRaftProgress* progress); static void probeAcked(SSyncRaftProgress* progress);
static void resumeProgress(SSyncRaftProgress* progress); static void resumeProgress(SSyncRaftProgress* progress);
void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) { void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
if (progress->inflights) {
syncRaftCloseInflights(progress->inflights);
}
SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflightMsgs); SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflightMsgs);
if (inflights == NULL) { if (inflights == NULL) {
return; return;
} }
*progress = (SSyncRaftProgress) { *progress = (SSyncRaftProgress) {
.matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0, .matchIndex = progress->id == pRaft->selfId ? syncRaftLogLastIndex(pRaft->log) : 0,
.nextIndex = syncRaftLogLastIndex(pRaft->log) + 1, .nextIndex = syncRaftLogLastIndex(pRaft->log) + 1,
.inflights = inflights, .inflights = inflights,
.isLearner = false, .isLearner = false,
...@@ -39,11 +47,9 @@ void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) ...@@ -39,11 +47,9 @@ void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress)
}; };
} }
/** // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
* syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from
* index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true.
* an outdated message. Otherwise it updates the progress and returns true.
**/
bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) { bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) {
bool updated = false; bool updated = false;
...@@ -58,27 +64,36 @@ bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastInde ...@@ -58,27 +64,36 @@ bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastInde
return updated; return updated;
} }
// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
// arguments are the index of the append message rejected by the follower, and
// the hint that we want to decrease to.
//
// Rejections can happen spuriously as messages are sent out of order or
// duplicated. In such cases, the rejection pertains to an index that the
// Progress already knows were previously acknowledged, and false is returned
// without changing the Progress.
//
// If the rejection is genuine, Next is lowered sensibly, and the Progress is
// cleared for sending log entries.
bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
SyncIndex rejected, SyncIndex matchHint) { SyncIndex rejected, SyncIndex matchHint) {
if (progress->state == PROGRESS_STATE_REPLICATE) { if (progress->state == PROGRESS_STATE_REPLICATE) {
/** // The rejection must be stale if the progress has matched and "rejected"
* the rejection must be stale if the progress has matched and "rejected" // is smaller than "match".
* is smaller than "match".
**/
if (rejected <= progress->matchIndex) { if (rejected <= progress->matchIndex) {
syncDebug("match index is up to date,ignore"); syncDebug("match index is up to date,ignore");
return false; return false;
} }
/* directly decrease next to match + 1 */ // Directly decrease next to match + 1.
//
// TODO(tbg): why not use matchHint if it's larger?
progress->nextIndex = progress->matchIndex + 1; progress->nextIndex = progress->matchIndex + 1;
return true; return true;
} }
/** // The rejection must be stale if "rejected" does not match next - 1. This
* The rejection must be stale if "rejected" does not match next - 1. This // is because non-replicating followers are probed one entry at a time.
* is because non-replicating followers are probed one entry at a time.
**/
if (rejected != progress->nextIndex - 1) { if (rejected != progress->nextIndex - 1) {
syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore" syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore"
, rejected, progress->nextIndex); , rejected, progress->nextIndex);
...@@ -91,14 +106,12 @@ bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, ...@@ -91,14 +106,12 @@ bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
return true; return true;
} }
/** // IsPaused returns whether sending log entries to this node has been throttled.
* syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. // This is done when a node has rejected recent MsgApps, is currently waiting
* This is done when a node has rejected recent MsgApps, is currently waiting // for a snapshot, or has reached the MaxInflightMsgs limit. In normal
* for a snapshot, or has reached the MaxInflightMsgs limit. In normal // operation, this is false. A throttled node will be contacted less frequently
* operation, this is false. A throttled node will be contacted less frequently // until it has reached a state in which it's able to accept a steady stream of
* until it has reached a state in which it's able to accept a steady stream of // log entries again.
* log entries again.
**/
bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
switch (progress->state) { switch (progress->state) {
case PROGRESS_STATE_PROBE: case PROGRESS_STATE_PROBE:
...@@ -112,58 +125,44 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { ...@@ -112,58 +125,44 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
} }
} }
int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) { SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) {
int i; SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*));
for (i = 0; i < TSDB_MAX_REPLICA; ++i) { if (ppProgress == NULL) {
if (progressMap->progress[i].id == id) { return NULL;
return i;
}
} }
return -1;
return *ppProgress;
} }
int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgress* progress) {
int i, j; refProgress(progress);
taosHashPut(progressMap->progressMap, &progress->id, sizeof(SyncNodeId*), &progress, sizeof(SSyncRaftProgress*));
}
for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) { void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) {
if (progressMap->progress[i].id == id) { SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*));
return i; if (ppProgress == NULL) {
} return;
if (j == -1 && progressMap->progress[i].id == SYNC_NON_NODE_ID) {
j = i;
}
} }
unrefProgress(*ppProgress, NULL);
assert(j != -1); taosHashRemove(progressMap->progressMap, &id, sizeof(SyncNodeId*));
progressMap->progress[i].id = id;
} }
void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) {
int i; return taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*)) != NULL;
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (progressMap->progress[i].id == id) {
progressMap->progress[i].id = SYNC_NON_NODE_ID;
break;
}
}
} }
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex;
} }
/** // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
* syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, // optionally and if larger, the index of the pending snapshot.
* optionally and if larger, the index of the pending snapshot.
**/
void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) {
/** // If the original state is StateSnapshot, progress knows that
* If the original state is ProgressStateSnapshot, progress knows that // the pending snapshot has been sent to this peer successfully, then
* the pending snapshot has been sent to this peer successfully, then // probes from pendingSnapshot + 1.
* probes from pendingSnapshot + 1.
**/
if (progress->state == PROGRESS_STATE_SNAPSHOT) { if (progress->state == PROGRESS_STATE_SNAPSHOT) {
SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex;
resetProgressState(progress, PROGRESS_STATE_PROBE); resetProgressState(progress, PROGRESS_STATE_PROBE);
...@@ -174,111 +173,88 @@ void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { ...@@ -174,111 +173,88 @@ void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) {
} }
} }
/** // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
* syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
**/
void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) { void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) {
resetProgressState(progress, PROGRESS_STATE_REPLICATE); resetProgressState(progress, PROGRESS_STATE_REPLICATE);
progress->nextIndex = progress->matchIndex + 1; progress->nextIndex = progress->matchIndex + 1;
} }
// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
// snapshot index.
void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) { void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) {
resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); resetProgressState(progress, PROGRESS_STATE_SNAPSHOT);
progress->pendingSnapshotIndex = snapshotIndex; progress->pendingSnapshotIndex = snapshotIndex;
} }
void syncRaftCopyProgress(const SSyncRaftProgress* progress, SSyncRaftProgress* out) { void syncRaftCopyProgress(const SSyncRaftProgress* progress, SSyncRaftProgress* out) {
memcpy(out, progress, sizeof(SSyncRaftProgress));
}
/**
* ResetState moves the Progress into the specified State, resetting ProbeSent,
* PendingSnapshot, and Inflights.
**/
static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state) {
progress->probeSent = false;
progress->pendingSnapshotIndex = 0;
progress->state = state;
syncRaftInflightReset(progress->inflights);
} }
/** void syncRaftInitProgressMap(SSyncRaftProgressMap* progressMap) {
* probeAcked is called when this peer has accepted an append. It resets progressMap->progressMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
* ProbeSent to signal that additional append messages should be sent without
* further delay.
**/
static void probeAcked(SSyncRaftProgress* progress) {
progress->probeSent = false;
} }
#if 0 void syncRaftFreeProgressMap(SSyncRaftProgressMap* progressMap) {
syncRaftVisitProgressMap(progressMap, unrefProgress, NULL);
SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i) { taosHashCleanup(progressMap->progressMap);
return pRaft->leaderState.progress[i].nextIndex;
} }
SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i) { void syncRaftClearProgressMap(SSyncRaftProgressMap* progressMap) {
return pRaft->leaderState.progress[i].matchIndex; taosHashClear(progressMap->progressMap);
} }
void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i) { void syncRaftCopyProgressMap(SSyncRaftProgressMap* from, SSyncRaftProgressMap* to) {
pRaft->leaderState.progress[i].lastSend = pRaft->io.time(pRaft); syncRaftVisitProgressMap(from, copyProgress, to);
} }
void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i) { bool syncRaftIterateProgressMap(const SSyncRaftProgressMap* progressMap, SSyncRaftProgress *pProgress) {
pRaft->leaderState.progress[i].lastSendSnapshot = pRaft->io.time(pRaft); SSyncRaftProgress **ppProgress = taosHashIterate(progressMap->progressMap, pProgress);
} if (ppProgress == NULL) {
return true;
}
bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i) { *pProgress = *(*ppProgress);
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); return false;
bool prev = progress->recentRecv;
progress->recentRecv = false;
return prev;
} }
void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i) { bool syncRaftVisitProgressMap(SSyncRaftProgressMap* progressMap, visitProgressFp fp, void* arg) {
pRaft->leaderState.progress[i].recentRecv = true; SSyncRaftProgress *pProgress;
while (!syncRaftIterateProgressMap(progressMap, pProgress)) {
fp(pProgress, arg);
}
} }
bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) { static void copyProgress(SSyncRaftProgress* progress, void* arg) {
return pRaft->leaderState.progress[i].recentRecv; assert(progress->refCount > 0);
SSyncRaftProgressMap* to = (SSyncRaftProgressMap*)arg;
syncRaftAddToProgressMap(to, progress);
} }
void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) { static void refProgress(SSyncRaftProgress* progress) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); progress->refCount += 1;
resetProgressState(progress, PROGRESS_STATE_SNAPSHOT);
progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log);
} }
void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { static void unrefProgress(SSyncRaftProgress* progress, void* arg) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); (void)arg;
progress->refCount -= 1;
if (progress->state == PROGRESS_STATE_SNAPSHOT) { assert(progress->refCount >= 0);
assert(progress->pendingSnapshotIndex > 0); if (progress->refCount == 0) {
SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; free(progress);
resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex);
} else {
resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = progress->matchIndex + 1;
} }
} }
void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) { // ResetState moves the Progress into the specified State, resetting ProbeSent,
resetProgressState(pRaft->leaderState.progress, PROGRESS_STATE_REPLICATE); // PendingSnapshot, and Inflights.
pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1; static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state) {
} progress->probeSent = false;
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
progress->pendingSnapshotIndex = 0; progress->pendingSnapshotIndex = 0;
progress->state = PROGRESS_STATE_PROBE; progress->state = state;
syncRaftInflightReset(progress->inflights);
} }
ESyncRaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) { // ProbeAcked is called when this peer has accepted an append. It resets
return pRaft->leaderState.progress[i].state; // ProbeSent to signal that additional append messages should be sent without
// further delay.
static void probeAcked(SSyncRaftProgress* progress) {
progress->probeSent = false;
} }
#endif
\ No newline at end of file
...@@ -13,62 +13,99 @@ ...@@ -13,62 +13,99 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "raft.h"
#include "sync_const.h"
#include "sync_raft_progress_tracker.h" #include "sync_raft_progress_tracker.h"
#include "sync_raft_proto.h" #include "sync_raft_proto.h"
SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft) {
SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker)); SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker));
if (tracker == NULL) { if (tracker == NULL) {
return NULL; return NULL;
} }
tracker->votesMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
syncRaftInitTrackConfig(&tracker->config);
tracker->pRaft = pRaft;
tracker->maxInflightMsgs = kSyncRaftMaxInflghtMsgs;
return tracker; return tracker;
} }
void syncRaftInitTrackConfig(SSyncRaftProgressTrackerConfig* config) {
syncRaftInitNodeMap(&config->learners);
syncRaftInitNodeMap(&config->learnersNext);
syncRaftInitQuorumJointConfig(&config->voters);
config->autoLeave = false;
}
void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config) {
syncRaftFreeNodeMap(&config->learners);
syncRaftFreeNodeMap(&config->learnersNext);
syncRaftFreeNodeMap(&config->voters.incoming);
syncRaftFreeNodeMap(&config->voters.outgoing);
}
// ResetVotes prepares for a new round of vote counting via recordVote.
void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) {
memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(ESyncRaftVoteType) * TSDB_MAX_REPLICA); taosHashClear(tracker->votesMap);
} }
void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) { void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) {
int i; syncRaftVisitProgressMap(&tracker->progressMap, visit, arg);
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncRaftProgress* progress = &(tracker->progressMap.progress[i]);
visit(i, progress, arg);
}
} }
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) { // RecordVote records that the node with the given id voted for this Raft
if (tracker->votes[i] != SYNC_RAFT_VOTE_RESP_UNKNOWN) { // instance if v == true (and declined it otherwise).
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant) {
ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &id, sizeof(SyncNodeId*));
if (pType != NULL) {
return; return;
} }
tracker->votes[i] = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; taosHashPut(tracker->votesMap, &id, sizeof(SyncNodeId), &grant, sizeof(bool*));
} }
void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) { void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) {
memcpy(to, from, sizeof(SSyncRaftProgressTrackerConfig));
}
int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
// NB: intentionally allow the empty config. In production we'll never see a
// non-empty config (we prevent it from being created) but we will need to
// be able to *create* an initial config, for example during bootstrap (or
// during tests). Instead of having to hand-code this, we allow
// transitioning from an empty config into any other legal and non-empty
// config.
if (!syncRaftIsAllNodeInProgressMap(&config->voters.incoming, progressMap)) return -1;
if (!syncRaftIsAllNodeInProgressMap(&config->voters.outgoing, progressMap)) return -1;
if (!syncRaftIsAllNodeInProgressMap(&config->learners, progressMap)) return -1;
if (!syncRaftIsAllNodeInProgressMap(&config->learnersNext, progressMap)) return -1;
return 0;
} }
/** // TallyVotes returns the number of granted and rejected Votes, and whether the
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the // election outcome is known.
* election outcome is known.
**/
ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) { ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) {
int i; SSyncRaftProgress* progress = NULL;
SSyncRaftProgress* progress;
int r, g; int r, g;
for (i = 0, r = 0, g = 0; i < TSDB_MAX_REPLICA; ++i) { // Make sure to populate granted/rejected correctly even if the Votes slice
progress = &(tracker->progressMap.progress[i]); // contains members no longer part of the configuration. This doesn't really
// matter in the way the numbers are used (they're informational), but might
// as well get it right.
while (!syncRaftIterateProgressMap(&tracker->progressMap, progress)) {
if (progress->id == SYNC_NON_NODE_ID) { if (progress->id == SYNC_NON_NODE_ID) {
continue; continue;
} }
if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) { bool* v = taosHashGet(tracker->votesMap, &progress->id, sizeof(SyncNodeId*));
if (v == NULL) {
continue; continue;
} }
if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) { if (*v) {
g++; g++;
} else { } else {
r++; r++;
...@@ -77,12 +114,43 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r ...@@ -77,12 +114,43 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r
if (rejected) *rejected = r; if (rejected) *rejected = r;
if (granted) *granted = g; if (granted) *granted = g;
return syncRaftVoteResult(&(tracker->config.voters), tracker->votes); return syncRaftVoteResult(&(tracker->config.voters), tracker->votesMap);
}
void syncRaftConfigState(SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) {
syncRaftCopyNodeMap(&tracker->config.voters.incoming, &cs->voters);
syncRaftCopyNodeMap(&tracker->config.voters.outgoing, &cs->votersOutgoing);
syncRaftCopyNodeMap(&tracker->config.learners, &cs->learners);
syncRaftCopyNodeMap(&tracker->config.learnersNext, &cs->learnersNext);
cs->autoLeave = tracker->config.autoLeave;
} }
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { static void matchAckIndexer(SyncNodeId id, void* arg, SyncIndex* index) {
memcpy(&cs->voters, &tracker->config.voters.incoming, sizeof(SSyncRaftNodeMap)); SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)arg;
memcpy(&cs->votersOutgoing, &tracker->config.voters.outgoing, sizeof(SSyncRaftNodeMap)); SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&tracker->progressMap, id);
memcpy(&cs->learners, &tracker->config.learners, sizeof(SSyncRaftNodeMap)); if (progress == NULL) {
memcpy(&cs->learnersNext, &tracker->config.learnersNext, sizeof(SSyncRaftNodeMap)); *index = 0;
return;
}
*index = progress->matchIndex;
}
// Committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
SyncIndex syncRaftCommittedIndex(SSyncRaftProgressTracker* tracker) {
return syncRaftJointConfigCommittedIndex(&tracker->config.voters, matchAckIndexer, tracker);
}
static void visitProgressActive(SSyncRaftProgress* progress, void* arg) {
SHashObj* votesMap = (SHashObj*)arg;
taosHashPut(votesMap, &progress->id, sizeof(SyncNodeId), &progress->recentActive, sizeof(bool));
}
// QuorumActive returns true if the quorum is active from the view of the local
// raft state machine. Otherwise, it returns false.
bool syncRaftQuorumActive(SSyncRaftProgressTracker* tracker) {
SHashObj* votesMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
syncRaftVisitProgressMap(&tracker->progressMap, visitProgressActive, votesMap);
return syncRaftVoteResult(&tracker->config.voters, votesMap) == SYNC_RAFT_VOTE_WON;
} }
\ No newline at end of file
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync_raft_node_map.h"
#include "sync_raft_quorum_majority.h" #include "sync_raft_quorum_majority.h"
#include "sync_raft_quorum_joint.h" #include "sync_raft_quorum_joint.h"
#include "sync_raft_quorum.h" #include "sync_raft_quorum.h"
...@@ -22,9 +23,9 @@ ...@@ -22,9 +23,9 @@
* a result indicating whether the vote is pending, lost, or won. A joint quorum * a result indicating whether the vote is pending, lost, or won. A joint quorum
* requires both majority quorums to vote in favor. * requires both majority quorums to vote in favor.
**/ **/
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes) { ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap) {
ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->incoming), votes); ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->incoming), votesMap);
ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->outgoing), votes); ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->outgoing), votesMap);
if (r1 == r2) { if (r1 == r2) {
// If they agree, return the agreed state. // If they agree, return the agreed state.
...@@ -40,46 +41,35 @@ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const E ...@@ -40,46 +41,35 @@ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const E
return SYNC_RAFT_VOTE_PENDING; return SYNC_RAFT_VOTE_PENDING;
} }
void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) { void syncRaftInitQuorumJointConfig(SSyncRaftQuorumJointConfig* config) {
int i, min; syncRaftInitNodeMap(&config->incoming);
syncRaftInitNodeMap(&config->outgoing);
}
for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) { void syncRaftFreeQuorumJointConfig(SSyncRaftQuorumJointConfig* config) {
if (config->incoming.nodeId[i] == id) { syncRaftFreeNodeMap(&config->incoming);
return; syncRaftFreeNodeMap(&config->outgoing);
} }
if (min == -1 && config->incoming.nodeId[i] == SYNC_NON_NODE_ID) {
min = i;
}
}
assert(min != -1); void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
config->incoming.nodeId[min] = id; syncRaftAddToNodeMap(&config->incoming, id);
config->incoming.replica += 1;
} }
void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) { void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
int i; syncRaftRemoveFromNodeMap(&config->incoming, id);
}
for (i = 0; i < TSDB_MAX_REPLICA; ++i) { void syncRaftJointConfigIDs(SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap) {
if (config->incoming.nodeId[i] == id) { syncRaftCopyNodeMap(&config->incoming, nodeMap);
config->incoming.replica -= 1;
config->incoming.nodeId[i] = SYNC_NON_NODE_ID;
break;
}
}
assert(config->incoming.replica >= 0); syncRaftUnionNodeMap(&config->outgoing, nodeMap);
} }
SyncIndex syncRaftJointConfigCommittedIndex(const SSyncRaftQuorumJointConfig* config, matchAckIndexerFp indexer, void* arg) {
SyncIndex index0, index1;
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { index0 = syncRaftMajorityConfigCommittedIndex(&config->incoming, indexer, arg);
int i; index1 = syncRaftMajorityConfigCommittedIndex(&config->outgoing, indexer, arg);
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (nodeId == nodeMap->nodeId[i]) {
return true;
}
}
return false; return index0 < index1 ? index0 : index1;
} }
\ No newline at end of file
...@@ -13,42 +13,109 @@ ...@@ -13,42 +13,109 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync_const.h"
#include "sync_raft_quorum.h" #include "sync_raft_quorum.h"
#include "sync_raft_quorum_majority.h" #include "sync_raft_quorum_majority.h"
#include "sync_raft_node_map.h"
/** // VoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns // a result indicating whether the vote is pending (i.e. neither a quorum of
* a result indicating whether the vote is pending (i.e. neither a quorum of // yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a // quorum of no has been reached).
* quorum of no has been reached). ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap) {
**/ int n = syncRaftNodeMapSize(config);
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes) { if (n == 0) {
if (config->replica == 0) { // By convention, the elections on an empty config win. This comes in
// handy with joint quorums because it'll make a half-populated joint
// quorum behave like a majority quorum.
return SYNC_RAFT_VOTE_WON; return SYNC_RAFT_VOTE_WON;
} }
int i, g, r, missing; int i, g, r, missing;
for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) { i = g = r = missing = 0;
if (config->nodeId[i] == SYNC_NON_NODE_ID) { SyncNodeId* pId = NULL;
while (!syncRaftIterateNodeMap(config, pId)) {
const bool* v = (const bool*)taosHashGet(votesMap, pId, sizeof(SyncNodeId*));
if (v == NULL) {
missing += 1;
continue; continue;
} }
if (votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) { if (*v) {
missing += 1;
} else if (votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) {
g +=1; g +=1;
} else { } else {
r += 1; r += 1;
} }
} }
int quorum = config->replica / 2 + 1; int quorum = n / 2 + 1;
if (g >= quorum) { if (g >= quorum) {
return SYNC_RAFT_VOTE_WON; return SYNC_RAFT_VOTE_WON;
} }
if (r + missing >= quorum) { if (g + missing >= quorum) {
return SYNC_RAFT_VOTE_PENDING; return SYNC_RAFT_VOTE_PENDING;
} }
return SYNC_RAFT_VOTE_LOST; return SYNC_RAFT_VOTE_LOST;
}
int compSyncIndex(const void * elem1, const void * elem2) {
SyncIndex index1 = *((SyncIndex*)elem1);
SyncIndex index2 = *((SyncIndex*)elem1);
if (index1 > index2) return 1;
if (index1 < index2) return -1;
return 0;
}
SyncIndex syncRaftMajorityConfigCommittedIndex(const SSyncRaftNodeMap* config, matchAckIndexerFp indexer, void* arg) {
int n = syncRaftNodeMapSize(config);
if (n == 0) {
// This plays well with joint quorums which, when one half is the zero
// MajorityConfig, should behave like the other half.
return kMaxCommitIndex;
}
// Use an on-stack slice to collect the committed indexes when n <= 7
// (otherwise we alloc). The alternative is to stash a slice on
// MajorityConfig, but this impairs usability (as is, MajorityConfig is just
// a map, and that's nice). The assumption is that running with a
// replication factor of >7 is rare, and in cases in which it happens
// performance is a lesser concern (additionally the performance
// implications of an allocation here are far from drastic).
SyncIndex* srt = NULL;
SyncIndex srk[TSDB_MAX_REPLICA];
if (n > TSDB_MAX_REPLICA) {
srt = (SyncIndex*)malloc(sizeof(SyncIndex) * n);
if (srt == NULL) {
return kMaxCommitIndex;
}
} else {
srt = &srk[0];
}
// Fill the slice with the indexes observed. Any unused slots will be
// left as zero; these correspond to voters that may report in, but
// haven't yet. We fill from the right (since the zeroes will end up on
// the left after sorting below anyway).
SyncNodeId *pId = NULL;
int i = 0;
SyncIndex index;
while (!syncRaftIterateNodeMap(config, pId)) {
indexer(*pId, arg, &index);
srt[i++] = index;
}
// Sort by index. Use a bespoke algorithm (copied from the stdlib's sort
// package) to keep srt on the stack.
qsort(srt, n, sizeof(SyncIndex), compSyncIndex);
// The smallest index into the array for which the value is acked by a
// quorum. In other words, from the end of the slice, move n/2+1 to the
// left (accounting for zero-indexing).
index = srt[n - (n/2 + 1)];
if (srt != &srk[0]) {
free(srt);
}
return index;
} }
\ No newline at end of file
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "sync_raft_restore.h" #include "sync_raft_restore.h"
#include "sync_raft_progress_tracker.h" #include "sync_raft_progress_tracker.h"
static void addToConfChangeSingleArray(SSyncConfChangeSingleArray* out, int* i, const SSyncRaftNodeMap* nodeMap, ESyncRaftConfChangeType t);
static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in); static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in);
// syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and // syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and
...@@ -27,21 +28,26 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA ...@@ -27,21 +28,26 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA
// the Changer only needs a ProgressMap (not a whole Tracker) at which point // the Changer only needs a ProgressMap (not a whole Tracker) at which point
// this can just take LastIndex and MaxInflight directly instead and cook up // this can just take LastIndex and MaxInflight directly instead and cook up
// the results from that alone. // the results from that alone.
int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) { int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
SSyncConfChangeSingleArray outgoing; SSyncConfChangeSingleArray outgoing;
SSyncConfChangeSingleArray incoming; SSyncConfChangeSingleArray incoming;
SSyncConfChangeSingleArray css; SSyncConfChangeSingleArray css;
SSyncRaftProgressTracker* tracker = changer->tracker; SSyncRaftProgressTracker* tracker = changer->tracker;
SSyncRaftProgressTrackerConfig* config = &tracker->config;
SSyncRaftProgressMap* progressMap = &tracker->progressMap;
int i, ret; int i, ret;
syncRaftInitConfArray(&outgoing);
syncRaftInitConfArray(&incoming);
syncRaftInitTrackConfig(config);
syncRaftInitProgressMap(progressMap);
ret = toConfChangeSingle(cs, &outgoing, &incoming); ret = toConfChangeSingle(cs, &outgoing, &incoming);
if (ret != 0) { if (ret != 0) {
goto out; goto out;
} }
if (outgoing.n == 0) { if (syncRaftConfArrayIsEmpty(&outgoing)) {
// No outgoing config, so just apply the incoming changes one by one. // No outgoing config, so just apply the incoming changes one by one.
for (i = 0; i < incoming.n; ++i) { for (i = 0; i < incoming.n; ++i) {
css = (SSyncConfChangeSingleArray) { css = (SSyncConfChangeSingleArray) {
...@@ -52,6 +58,9 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) ...@@ -52,6 +58,9 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs)
if (ret != 0) { if (ret != 0) {
goto out; goto out;
} }
syncRaftCopyTrackerConfig(config, &changer->tracker->config);
syncRaftCopyProgressMap(progressMap, &changer->tracker->progressMap);
} }
} else { } else {
// The ConfState describes a joint configuration. // The ConfState describes a joint configuration.
...@@ -68,6 +77,8 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) ...@@ -68,6 +77,8 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs)
if (ret != 0) { if (ret != 0) {
goto out; goto out;
} }
syncRaftCopyTrackerConfig(config, &changer->tracker->config);
syncRaftCopyProgressMap(progressMap, &changer->tracker->progressMap);
} }
ret = syncRaftChangerEnterJoint(changer, cs->autoLeave, &incoming, config, progressMap); ret = syncRaftChangerEnterJoint(changer, cs->autoLeave, &incoming, config, progressMap);
...@@ -77,11 +88,24 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) ...@@ -77,11 +88,24 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs)
} }
out: out:
if (incoming.n != 0) free(incoming.changes); syncRaftFreeConfArray(&incoming);
if (outgoing.n != 0) free(outgoing.changes); syncRaftFreeConfArray(&outgoing);
return ret; return ret;
} }
static void addToConfChangeSingleArray(SSyncConfChangeSingleArray* out, int* i, const SSyncRaftNodeMap* nodeMap, ESyncRaftConfChangeType t) {
SyncNodeId* pId = NULL;
while (!syncRaftIterateNodeMap(nodeMap, pId)) {
out->changes[*i] = (SSyncConfChangeSingle) {
.type = t,
.nodeId = *pId,
};
*i += 1;
}
}
// toConfChangeSingle translates a conf state into 1) a slice of operations creating // toConfChangeSingle translates a conf state into 1) a slice of operations creating
// first the config that will become the outgoing one, and then the incoming one, and // first the config that will become the outgoing one, and then the incoming one, and
// b) another slice that, when applied to the config resulted from 1), represents the // b) another slice that, when applied to the config resulted from 1), represents the
...@@ -89,15 +113,16 @@ out: ...@@ -89,15 +113,16 @@ out:
static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in) { static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in) {
int i; int i;
out->n = in->n = 0; out->n = syncRaftNodeMapSize(&cs->votersOutgoing);
out->n = cs->votersOutgoing.replica;
out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * out->n); out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * out->n);
if (out->changes == NULL) { if (out->changes == NULL) {
out->n = 0; out->n = 0;
return -1; return -1;
} }
in->n = cs->votersOutgoing.replica + cs->voters.replica + cs->learners.replica + cs->learnersNext.replica; in->n = syncRaftNodeMapSize(&cs->votersOutgoing) +
syncRaftNodeMapSize(&cs->voters) +
syncRaftNodeMapSize(&cs->learners) +
syncRaftNodeMapSize(&cs->learnersNext);
out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * in->n); out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * in->n);
if (in->changes == NULL) { if (in->changes == NULL) {
in->n = 0; in->n = 0;
...@@ -132,50 +157,24 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA ...@@ -132,50 +157,24 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA
// //
// as desired. // as desired.
for (i = 0; i < cs->votersOutgoing.replica; ++i) { // If there are outgoing voters, first add them one by one so that the
// If there are outgoing voters, first add them one by one so that the // (non-joint) config has them all.
// (non-joint) config has them all. i = 0;
out->changes[i] = (SSyncConfChangeSingle) { addToConfChangeSingleArray(out, &i, &cs->votersOutgoing, SYNC_RAFT_Conf_AddNode);
.type = SYNC_RAFT_Conf_AddNode, assert(i == out->n);
.nodeId = cs->votersOutgoing.nodeId[i],
};
}
// We're done constructing the outgoing slice, now on to the incoming one // We're done constructing the outgoing slice, now on to the incoming one
// (which will apply on top of the config created by the outgoing slice). // (which will apply on top of the config created by the outgoing slice).
i = 0;
// First, we'll remove all of the outgoing voters. // First, we'll remove all of the outgoing voters.
int j = 0; addToConfChangeSingleArray(in, &i, &cs->votersOutgoing, SYNC_RAFT_Conf_RemoveNode);
for (i = 0; i < cs->votersOutgoing.replica; ++i) {
in->changes[j] = (SSyncConfChangeSingle) {
.type = SYNC_RAFT_Conf_RemoveNode,
.nodeId = cs->votersOutgoing.nodeId[i],
};
j += 1;
}
// Then we'll add the incoming voters and learners. // Then we'll add the incoming voters and learners.
for (i = 0; i < cs->voters.replica; ++i) { addToConfChangeSingleArray(in, &i, &cs->voters, SYNC_RAFT_Conf_AddNode);
in->changes[j] = (SSyncConfChangeSingle) { addToConfChangeSingleArray(in, &i, &cs->learners, SYNC_RAFT_Conf_AddLearnerNode);
.type = SYNC_RAFT_Conf_AddNode, addToConfChangeSingleArray(in, &i, &cs->learnersNext, SYNC_RAFT_Conf_AddLearnerNode);
.nodeId = cs->voters.nodeId[i], assert(i == in->n);
};
j += 1;
}
for (i = 0; i < cs->learners.replica; ++i) {
in->changes[j] = (SSyncConfChangeSingle) {
.type = SYNC_RAFT_Conf_AddLearnerNode,
.nodeId = cs->learners.nodeId[i],
};
j += 1;
}
// Same for LearnersNext; these are nodes we want to be learners but which
// are currently voters in the outgoing config.
for (i = 0; i < cs->learnersNext.replica; ++i) {
in->changes[j] = (SSyncConfChangeSingle) {
.type = SYNC_RAFT_Conf_AddLearnerNode,
.nodeId = cs->learnersNext.nodeId[i],
};
j += 1;
}
return 0; return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册