/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #ifndef TD_SYNC_RAFT_PROGRESS_H #define TD_SYNC_RAFT_PROGRESS_H #include "sync_type.h" #include "sync_raft_inflights.h" #include "thash.h" /** * State defines how the leader should interact with the follower. * * When in PROGRESS_STATE_PROBE, leader sends at most one replication message * per heartbeat interval. It also probes actual progress of the follower. * * When in PROGRESS_STATE_REPLICATE, leader optimistically increases next * to the latest entry sent after sending replication message. This is * an optimized state for fast replicating log entries to the follower. * * When in PROGRESS_STATE_SNAPSHOT, leader should have sent out snapshot * before and stops sending any replication message. * * PROGRESS_STATE_PROBE is the initial state. **/ typedef enum ESyncRaftProgressState { /** * StateProbe indicates a follower whose last index isn't known. Such a * follower is "probed" (i.e. an append sent periodically) to narrow down * its last index. In the ideal (and common) case, only one round of probing * is necessary as the follower will react with a hint. Followers that are * probed over extended periods of time are often offline. **/ PROGRESS_STATE_PROBE = 0, /** * StateReplicate is the state steady in which a follower eagerly receives * log entries to append to its log. **/ PROGRESS_STATE_REPLICATE, /** * StateSnapshot indicates a follower that needs log entries not available * from the leader's Raft log. Such a follower needs a full snapshot to * return to StateReplicate. **/ PROGRESS_STATE_SNAPSHOT, } ESyncRaftProgressState; static const char* kProgressStateString[] = { "Probe", "Replicate", "Snapshot", }; // Progress represents a follower’s progress in the view of the leader. Leader // maintains progresses of all followers, and sends entries to the follower // based on its progress. // // NB(tbg): Progress is basically a state machine whose transitions are mostly // strewn around `*raft.raft`. Additionally, some fields are only used when in a // certain State. All of this isn't ideal. struct SSyncRaftProgress { SyncGroupId groupId; SyncNodeId id; int16_t refCount; SyncIndex nextIndex; SyncIndex matchIndex; // State defines how the leader should interact with the follower. // // When in StateProbe, leader sends at most one replication message // per heartbeat interval. It also probes actual progress of the follower. // // When in StateReplicate, leader optimistically increases next // to the latest entry sent after sending replication message. This is // an optimized state for fast replicating log entries to the follower. // // When in StateSnapshot, leader should have sent out snapshot // before and stops sending any replication message. ESyncRaftProgressState state; // PendingSnapshot is used in StateSnapshot. // If there is a pending snapshot, the pendingSnapshot will be set to the // index of the snapshot. If pendingSnapshot is set, the replication process of // this Progress will be paused. raft will not resend snapshot until the pending one // is reported to be failed. SyncIndex pendingSnapshotIndex; // RecentActive is true if the progress is recently active. Receiving any messages // from the corresponding follower indicates the progress is active. // RecentActive can be reset to false after an election timeout. // // TODO(tbg): the leader should always have this set to true. bool recentActive; // ProbeSent is used while this follower is in StateProbe. When ProbeSent is // true, raft should pause sending replication message to this peer until // ProbeSent is reset. See ProbeAcked() and IsPaused(). bool probeSent; // Inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries. // The max number of entries per message is defined in raft config as MaxSizePerMsg. // Thus inflight effectively limits both the number of inflight messages // and the bandwidth each Progress can use. // When inflights is Full, no more message should be sent. // When a leader sends out a message, the index of the last // entry should be added to inflights. The index MUST be added // into inflights in order. // When a leader receives a reply, the previous inflights should // be freed by calling inflights.FreeLE with the index of the last // received entry. SSyncRaftInflights* inflights; // IsLearner is true if this progress is tracked for a learner. bool isLearner; }; struct SSyncRaftProgressMap { // map nodeId -> SSyncRaftProgress* SHashObj* progressMap; }; static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgress* progress) { return kProgressStateString[progress->state]; } void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress); // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, // optionally and if larger, the index of the pending snapshot. void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress); // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress); // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true. bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex); // OptimisticUpdate signals that appends all the way up to and including index n // are in-flight. As a result, Next is increased to n+1. static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { progress->nextIndex = nextIndex + 1; } // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The // arguments are the index of the append message rejected by the follower, and // the hint that we want to decrease to. // // Rejections can happen spuriously as messages are sent out of order or // duplicated. In such cases, the rejection pertains to an index that the // Progress already knows were previously acknowledged, and false is returned // without changing the Progress. // // If the rejection is genuine, Next is lowered sensibly, and the Progress is // cleared for sending log entries. bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, SyncIndex rejected, SyncIndex matchHint); // IsPaused returns whether sending log entries to this node has been throttled. // This is done when a node has rejected recent MsgApps, is currently waiting // for a snapshot, or has reached the MaxInflightMsgs limit. In normal // operation, this is false. A throttled node will be contacted less frequently // until it has reached a state in which it's able to accept a steady stream of // log entries again. bool syncRaftProgressIsPaused(SSyncRaftProgress* progress); static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) { return progress->nextIndex; } static FORCE_INLINE ESyncRaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) { return progress->state == PROGRESS_STATE_REPLICATE; } static FORCE_INLINE ESyncRaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) { return progress->state == PROGRESS_STATE_SNAPSHOT; } static FORCE_INLINE ESyncRaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) { return progress->state == PROGRESS_STATE_PROBE; } static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progress) { return progress->recentActive; } void syncRaftInitProgressMap(SSyncRaftProgressMap* progressMap); void syncRaftFreeProgressMap(SSyncRaftProgressMap* progressMap); void syncRaftClearProgressMap(SSyncRaftProgressMap* progressMap); void syncRaftCopyProgressMap(SSyncRaftProgressMap* from, SSyncRaftProgressMap* to); SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id); int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgress* progress); void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); /** * return true if progress's log is up-todate **/ bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); // BecomeSnapshot moves the Progress to StateSnapshot with the specified pending // snapshot index. void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to); // return true if reach the end bool syncRaftIterateProgressMap(const SSyncRaftProgressMap* progressMap, SSyncRaftProgress *pProgress); bool syncRaftVisitProgressMap(SSyncRaftProgressMap* progressMap, visitProgressFp fp, void* arg); #if 0 void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i); void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i); void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i); bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i); void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i); void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); #endif #endif /* TD_SYNC_RAFT_PROGRESS_H */