diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 795ea7cc999117a287ea95043e7bff75af315952..c8bf63f81cf2c0e8e34d154cc297d43c5d2c4cf6 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -44,6 +44,7 @@ struct SSyncRaft { SSyncCluster cluster; + int selfIndex; SyncNodeId selfId; SyncGroupId selfGroupId; @@ -113,9 +114,6 @@ struct SSyncRaft { **/ uint16_t heartbeatElapsed; - // current tick count since start up - uint32_t currentTick; - bool preVote; bool checkQuorum; @@ -130,6 +128,9 @@ struct SSyncRaft { int randomizedElectionTimeout; bool disableProposalForwarding; + // current tick count since start up + uint32_t currentTick; + SyncRaftStepFp stepFp; SyncRaftTickFp tickFp; diff --git a/source/libs/sync/inc/raft_progress.h b/source/libs/sync/inc/raft_progress.h deleted file mode 100644 index 41d66d59d096b57ec867d22b375790d800cf1496..0000000000000000000000000000000000000000 --- a/source/libs/sync/inc/raft_progress.h +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TD_SYNC_RAFT_PROGRESS_H -#define TD_SYNC_RAFT_PROGRESS_H - -#include "sync_type.h" - -/** - * SSyncRaftInflights is a sliding window for the inflight messages. - * Thus inflight effectively limits both the number of inflight messages - * and the bandwidth each Progress can use. - * When inflights is full, no more message should be sent. - * When a leader sends out a message, the index of the last - * entry should be added to inflights. The index MUST be added - * into inflights in order. - * When a leader receives a reply, the previous inflights should - * be freed by calling syncRaftInflightFreeTo with the index of the last - * received entry. - **/ -typedef struct SSyncRaftInflights { - /* the starting index in the buffer */ - int start; - - /* number of inflights in the buffer */ - int count; - - /* the size of the buffer */ - int size; - - /** - * buffer contains the index of the last entry - * inside one message. - **/ - SyncIndex* buffer; -} SSyncRaftInflights; - -/** - * State defines how the leader should interact with the follower. - * - * When in PROGRESS_PROBE, leader sends at most one replication message - * per heartbeat interval. It also probes actual progress of the follower. - * - * When in PROGRESS_REPLICATE, leader optimistically increases next - * to the latest entry sent after sending replication message. This is - * an optimized state for fast replicating log entries to the follower. - * - * When in PROGRESS_SNAPSHOT, leader should have sent out snapshot - * before and stops sending any replication message. - * - * PROGRESS_PROBE is the initial state. - **/ -typedef enum RaftProgressState { - PROGRESS_PROBE = 0, - PROGRESS_REPLICATE, - PROGRESS_SNAPSHOT, -} RaftProgressState; - -/** - * Progress represents a follower’s progress in the view of the leader. Leader maintains - * progresses of all followers, and sends entries to the follower based on its progress. - **/ -struct SSyncRaftProgress { - SyncNodeId id; - - SyncIndex nextIndex; - - SyncIndex matchIndex; - - RaftProgressState state; - - /** - * paused is used in PROGRESS_PROBE. - * When paused is true, raft should pause sending replication message to this peer. - **/ - bool paused; - - // last send append message tick - uint32_t lastSendTick; - - /** - * pendingSnapshotIndex is used in PROGRESS_SNAPSHOT. - * If there is a pending snapshot, the pendingSnapshotIndex will be set to the - * index of the snapshot. If pendingSnapshotIndex is set, the replication process of - * this Progress will be paused. raft will not resend snapshot until the pending one - * is reported to be failed. - **/ - SyncIndex pendingSnapshotIndex; - - /** - * recentActive is true if the progress is recently active. Receiving any messages - * from the corresponding follower indicates the progress is active. - * RecentActive can be reset to false after an election timeout. - **/ - bool recentActive; - - /** - * flow control sliding window - **/ - SSyncRaftInflights inflights; - - // IsLearner is true if this progress is tracked for a learner. - bool isLearner; -}; - -void syncRaftInitProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress); - -/** - * syncRaftProgressMaybeUpdate returns false if the given lastIndex index comes from i-th node's log. - * Otherwise it updates the progress and returns true. - **/ -bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex); - -static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { - progress->nextIndex = nextIndex + 1; -} - -/** - * syncRaftProgressMaybeDecrTo returns false if the given to index comes from an out of order message. - * Otherwise it decreases the progress next index to min(rejected, last) and returns true. - **/ -bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, - SyncIndex rejected, SyncIndex lastIndex); - -/** - * syncRaftProgressIsPaused returns whether sending log entries to this node has been - * paused. A node may be paused because it has rejected recent - * MsgApps, is currently waiting for a snapshot, or has reached the - * MaxInflightMsgs limit. - **/ -bool syncRaftProgressIsPaused(SSyncRaftProgress* progress); - -static FORCE_INLINE void syncRaftProgressPause(SSyncRaftProgress* progress) { - progress->paused = true; -} - -static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) { - return progress->nextIndex; -} - -static FORCE_INLINE RaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) { - return progress->state == PROGRESS_REPLICATE; -} - -static FORCE_INLINE RaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) { - return progress->state == PROGRESS_SNAPSHOT; -} - -static FORCE_INLINE RaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) { - return progress->state == PROGRESS_PROBE; -} - -static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progress) { - return progress->recentActive; -} - -static FORCE_INLINE bool syncRaftProgressUpdateSendTick(SSyncRaftProgress* progress, SyncTick current) { - return progress->lastSendTick = current; -} - -void syncRaftProgressFailure(SSyncRaftProgress* progress); - -bool syncRaftProgressNeedAbortSnapshot(SSyncRaftProgress* progress); - -/** - * return true if progress's log is up-todate - **/ -bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); - -void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress); - -void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress); - -void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); - -/* inflights APIs */ -int syncRaftInflightReset(SSyncRaftInflights* inflights); -bool syncRaftInflightFull(SSyncRaftInflights* inflights); -void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex); -void syncRaftInflightFreeTo(SSyncRaftInflights* inflights, SyncIndex toIndex); -void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights); - -#if 0 - -void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); - - - -SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i); - -void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i); - -void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i); - -bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i); - -void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i); - - - -void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); - -#endif - -#endif /* TD_SYNC_RAFT_PROGRESS_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_inflights.h b/source/libs/sync/inc/sync_raft_inflights.h new file mode 100644 index 0000000000000000000000000000000000000000..6d249c9274452d28604c335dce649833b225ea6e --- /dev/null +++ b/source/libs/sync/inc/sync_raft_inflights.h @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_INFLIGHTS_H +#define TD_SYNC_RAFT_INFLIGHTS_H + +#include "sync.h" + +/** + * SSyncRaftInflights limits the number of MsgApp (represented by the largest index + * contained within) sent to followers but not yet acknowledged by them. Callers + * use syncRaftInflightFull() to check whether more messages can be sent, + * call syncRaftInflightAdd() whenever they are sending a new append, + * and release "quota" via FreeLE() whenever an ack is received. +**/ +typedef struct SSyncRaftInflights { + /* the starting index in the buffer */ + int start; + + /* number of inflights in the buffer */ + int count; + + /* the size of the buffer */ + int size; + + /** + * buffer contains the index of the last entry + * inside one message. + **/ + SyncIndex* buffer; +} SSyncRaftInflights; + +SSyncRaftInflights* syncRaftOpenInflights(int size); +void syncRaftCloseInflights(SSyncRaftInflights*); + +static FORCE_INLINE void syncRaftInflightReset(SSyncRaftInflights* inflights) { + inflights->count = 0; + inflights->start = 0; +} + +static FORCE_INLINE bool syncRaftInflightFull(SSyncRaftInflights* inflights) { + return inflights->count == inflights->size; +} + +/** + * syncRaftInflightAdd notifies the Inflights that a new message with the given index is being + * dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() + * to verify that there is room for one more message, + * and consecutive calls to add syncRaftInflightAdd() must provide a + * monotonic sequence of indexes. + **/ +void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex); + +/** + * syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight. + **/ +void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex); + +/** + * syncRaftInflightFreeFirstOne releases the first inflight. + * This is a no-op if nothing is inflight. + **/ +void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights); + +#endif /* TD_SYNC_RAFT_INFLIGHTS_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h new file mode 100644 index 0000000000000000000000000000000000000000..1f693219be059d35ff3febc69e981f3d5db74d67 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_PROGRESS_H +#define TD_SYNC_RAFT_PROGRESS_H + +#include "sync_type.h" +#include "sync_raft_inflights.h" + +/** + * State defines how the leader should interact with the follower. + * + * When in PROGRESS_STATE_PROBE, leader sends at most one replication message + * per heartbeat interval. It also probes actual progress of the follower. + * + * When in PROGRESS_STATE_REPLICATE, leader optimistically increases next + * to the latest entry sent after sending replication message. This is + * an optimized state for fast replicating log entries to the follower. + * + * When in PROGRESS_STATE_SNAPSHOT, leader should have sent out snapshot + * before and stops sending any replication message. + * + * PROGRESS_STATE_PROBE is the initial state. + **/ +typedef enum RaftProgressState { + /** + * StateProbe indicates a follower whose last index isn't known. Such a + * follower is "probed" (i.e. an append sent periodically) to narrow down + * its last index. In the ideal (and common) case, only one round of probing + * is necessary as the follower will react with a hint. Followers that are + * probed over extended periods of time are often offline. + **/ + PROGRESS_STATE_PROBE = 0, + + /** + * StateReplicate is the state steady in which a follower eagerly receives + * log entries to append to its log. + **/ + PROGRESS_STATE_REPLICATE, + + /** + * StateSnapshot indicates a follower that needs log entries not available + * from the leader's Raft log. Such a follower needs a full snapshot to + * return to StateReplicate. + **/ + PROGRESS_STATE_SNAPSHOT, +} RaftProgressState; + +/** + * Progress represents a follower’s progress in the view of the leader. Leader maintains + * progresses of all followers, and sends entries to the follower based on its progress. + **/ +struct SSyncRaftProgress { + SyncIndex nextIndex; + + SyncIndex matchIndex; + + /** + * State defines how the leader should interact with the follower. + * + * When in StateProbe, leader sends at most one replication message + * per heartbeat interval. It also probes actual progress of the follower. + * + * When in StateReplicate, leader optimistically increases next + * to the latest entry sent after sending replication message. This is + * an optimized state for fast replicating log entries to the follower. + * + * When in StateSnapshot, leader should have sent out snapshot + * before and stops sending any replication message. + **/ + RaftProgressState state; + + /** + * pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT. + * If there is a pending snapshot, the pendingSnapshotIndex will be set to the + * index of the snapshot. If pendingSnapshotIndex is set, the replication process of + * this Progress will be paused. raft will not resend snapshot until the pending one + * is reported to be failed. + **/ + SyncIndex pendingSnapshotIndex; + + /** + * recentActive is true if the progress is recently active. Receiving any messages + * from the corresponding follower indicates the progress is active. + * RecentActive can be reset to false after an election timeout. + **/ + bool recentActive; + + /** + * probeSent is used while this follower is in StateProbe. When probeSent is + * true, raft should pause sending replication message to this peer until + * probeSent is reset. See ProbeAcked() and IsPaused(). + **/ + bool probeSent; + + /** + * inflights is a sliding window for the inflight messages. + * Each inflight message contains one or more log entries. + * The max number of entries per message is defined in raft config as MaxSizePerMsg. + * Thus inflight effectively limits both the number of inflight messages + * and the bandwidth each Progress can use. + * When inflights is Full, no more message should be sent. + * When a leader sends out a message, the index of the last + * entry should be added to inflights. The index MUST be added + * into inflights in order. + * When a leader receives a reply, the previous inflights should + * be freed by calling inflights.FreeLE with the index of the last + * received entry. + **/ + SSyncRaftInflights* inflights; + + /** + * IsLearner is true if this progress is tracked for a learner. + **/ + bool isLearner; +}; + +void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress); + +/** + * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, + * optionally and if larger, the index of the pending snapshot. + **/ +void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress); + +/** + * syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1. + **/ +void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress); + +/** + * syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the + * index acked by it. The method returns false if the given n index comes from + * an outdated message. Otherwise it updates the progress and returns true. + **/ +bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex); + +/** + * syncRaftProgressOptimisticNextIndex signals that appends all the way up to and including index n + * are in-flight. As a result, Next is increased to n+1. + **/ +static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { + progress->nextIndex = nextIndex + 1; +} + +/** + * syncRaftProgressMaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The + * arguments are the index of the append message rejected by the follower, and + * the hint that we want to decrease to. + * + * Rejections can happen spuriously as messages are sent out of order or + * duplicated. In such cases, the rejection pertains to an index that the + * Progress already knows were previously acknowledged, and false is returned + * without changing the Progress. + * + * If the rejection is genuine, Next is lowered sensibly, and the Progress is + * cleared for sending log entries. +**/ +bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, + SyncIndex rejected, SyncIndex matchHint); + +/** + * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. + * This is done when a node has rejected recent MsgApps, is currently waiting + * for a snapshot, or has reached the MaxInflightMsgs limit. In normal + * operation, this is false. A throttled node will be contacted less frequently + * until it has reached a state in which it's able to accept a steady stream of + * log entries again. + **/ +bool syncRaftProgressIsPaused(SSyncRaftProgress* progress); + +static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) { + return progress->nextIndex; +} + +static FORCE_INLINE RaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) { + return progress->state == PROGRESS_STATE_REPLICATE; +} + +static FORCE_INLINE RaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) { + return progress->state == PROGRESS_STATE_SNAPSHOT; +} + +static FORCE_INLINE RaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) { + return progress->state == PROGRESS_STATE_PROBE; +} + +static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progress) { + return progress->recentActive; +} + +/** + * return true if progress's log is up-todate + **/ +bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); + +void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); + + + +#if 0 + +void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); + + + +SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i); + +void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i); + +void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i); + +bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i); + +void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i); + + + +void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); + +#endif + +#endif /* TD_SYNC_RAFT_PROGRESS_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index ffc134fec497a527f2c57a41a937dd072b62c5b9..40d43895c89c55015608eecdec994c26bff8b2ff 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -18,7 +18,7 @@ #include "sync_type.h" #include "sync_raft_quorum_joint.h" -#include "raft_progress.h" +#include "sync_raft_progress.h" struct SSyncRaftProgressTrackerConfig { SSyncRaftQuorumJointConfig voters; @@ -94,7 +94,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker(); void syncRaftResetVotes(SSyncRaftProgressTracker*); -typedef void (*visitProgressFp)(SSyncRaftProgress* progress, void* arg); +typedef void (*visitProgressFp)(int i, SSyncRaftProgress* progress, void* arg); void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); #endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 4a3654131cdd21b2b22067b0d705687ee39519e1..b43a35c03e5bbf0733cb709cd16c7a7072427df6 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -102,6 +102,8 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); + pRaft->selfIndex = pRaft->cluster.selfIndex; + syncInfo("[%d:%d] restore vgid %d state: snapshot index success", pRaft->selfGroupId, pRaft->selfId, pInfo->vgId); return 0; @@ -443,8 +445,8 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) { pRaft->leadTransferee = SYNC_NON_NODE_ID; } -static void initProgress(SSyncRaftProgress* progress, void* arg) { - syncRaftInitProgress((SSyncRaft*)arg, progress); +static void initProgress(int i, SSyncRaftProgress* progress, void* arg) { + syncRaftInitProgress(i, (SSyncRaft*)arg, progress); } static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index 473499b7950fdf6346de3d2094255f6d211cc14d..3c7216239ae1297491d8240ae549b5c753faaa7c 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -15,7 +15,7 @@ #include "raft.h" #include "raft_log.h" -#include "raft_progress.h" +#include "sync_raft_progress.h" #include "raft_replication.h" static int sendSnapshot(SSyncRaft* pRaft, int i); diff --git a/source/libs/sync/src/sync_raft_inflights.c b/source/libs/sync/src/sync_raft_inflights.c new file mode 100644 index 0000000000000000000000000000000000000000..3d740b5a9e18f8b8461c9db15c3f635da50b55a9 --- /dev/null +++ b/source/libs/sync/src/sync_raft_inflights.c @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_inflights.h" + +SSyncRaftInflights* syncRaftOpenInflights(int size) { + SSyncRaftInflights* inflights = (SSyncRaftInflights*)malloc(sizeof(SSyncRaftInflights)); + if (inflights == NULL) { + return NULL; + } + SyncIndex* buffer = (SyncIndex*)malloc(sizeof(SyncIndex) * size); + if (buffer == NULL) { + free(inflights); + return NULL; + } + *inflights = (SSyncRaftInflights) { + .buffer = buffer, + .count = 0, + .size = 0, + .start = 0, + }; + + return inflights; +} + +void syncRaftCloseInflights(SSyncRaftInflights* inflights) { + free(inflights->buffer); + free(inflights); +} + +/** + * syncRaftInflightAdd notifies the Inflights that a new message with the given index is being + * dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() + * to verify that there is room for one more message, + * and consecutive calls to add syncRaftInflightAdd() must provide a + * monotonic sequence of indexes. + **/ +void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) { + assert(!syncRaftInflightFull(inflights)); + + int next = inflights->start + inflights->count; + int size = inflights->size; + /* is next wrapped around buffer? */ + if (next >= size) { + next -= size; + } + + inflights->buffer[next] = inflightIndex; + inflights->count++; +} + +/** + * syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight. + **/ +void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) { + if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) { + /* out of the left side of the window */ + return; + } + + int i, idx; + for (i = 0, idx = inflights->start; i < inflights->count; i++) { + if (toIndex < inflights->buffer[idx]) { // found the first large inflight + break; + } + + // increase index and maybe rotate + int size = inflights->size; + idx++; + if (idx >= size) { + idx -= size; + } + } + + // free i inflights and set new start index + inflights->count -= i; + inflights->start = idx; + assert(inflights->count >= 0); + if (inflights->count == 0) { + // inflights is empty, reset the start index so that we don't grow the + // buffer unnecessarily. + inflights->start = 0; + } +} + +/** + * syncRaftInflightFreeFirstOne releases the first inflight. + * This is a no-op if nothing is inflight. + **/ +void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) { + syncRaftInflightFreeLE(inflights, inflights->buffer[inflights->start]); +} diff --git a/source/libs/sync/src/raft_progress.c b/source/libs/sync/src/sync_raft_progress.c similarity index 59% rename from source/libs/sync/src/raft_progress.c rename to source/libs/sync/src/sync_raft_progress.c index 6edc808698b893f6e66bffc9a2d2152f6bc47047..ec98be7dfa06a4d1a60a332093cb521c80cee381 100644 --- a/source/libs/sync/src/raft_progress.c +++ b/source/libs/sync/src/sync_raft_progress.c @@ -15,57 +15,50 @@ #include "raft.h" #include "raft_log.h" -#include "raft_progress.h" +#include "sync_raft_progress.h" +#include "sync_raft_progress_tracker.h" #include "sync.h" #include "syncInt.h" static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state); +static void probeAcked(SSyncRaftProgress* progress); static void resumeProgress(SSyncRaftProgress* progress); -int syncRaftProgressCreate(SSyncRaft* pRaft) { - -/* - inflights->buffer = (SyncIndex*)malloc(sizeof(SyncIndex) * pRaft->maxInflightMsgs); - if (inflights->buffer == NULL) { - return RAFT_OOM; +void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) { + SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflight); + if (inflights == NULL) { + return; } - inflights->size = pRaft->maxInflightMsgs; -*/ -} - -/* -int syncRaftProgressRecreate(SSyncRaft* pRaft, const RaftConfiguration* configuration) { - -} -*/ - -void syncRaftInitProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress) { *progress = (SSyncRaftProgress) { - .matchIndex = progress->id == pRaft->selfId ? syncRaftLogLastIndex(pRaft->log) : 0, + .matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0, .nextIndex = syncRaftLogLastIndex(pRaft->log) + 1, - //.inflights = + .inflights = inflights, }; } +/** + * syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the + * index acked by it. The method returns false if the given n index comes from + * an outdated message. Otherwise it updates the progress and returns true. + **/ bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) { bool updated = false; if (progress->matchIndex < lastIndex) { progress->matchIndex = lastIndex; updated = true; - resumeProgress(progress); - } - if (progress->nextIndex < lastIndex + 1) { - progress->nextIndex = lastIndex + 1; + probeAcked(progress); } + progress->nextIndex = MAX(progress->nextIndex, lastIndex + 1); + return updated; } bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, - SyncIndex rejected, SyncIndex lastIndex) { - if (progress->state == PROGRESS_REPLICATE) { + SyncIndex rejected, SyncIndex matchHint) { + if (progress->state == PROGRESS_STATE_REPLICATE) { /** * the rejection must be stale if the progress has matched and "rejected" * is smaller than "match". @@ -77,143 +70,102 @@ bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, /* directly decrease next to match + 1 */ progress->nextIndex = progress->matchIndex + 1; - //syncRaftProgressBecomeProbe(raft, i); return true; } + /** + * The rejection must be stale if "rejected" does not match next - 1. This + * is because non-replicating followers are probed one entry at a time. + **/ if (rejected != progress->nextIndex - 1) { syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore" , rejected, progress->nextIndex); return false; } - progress->nextIndex = MIN(rejected, lastIndex + 1); - if (progress->nextIndex < 1) { - progress->nextIndex = 1; - } + progress->nextIndex = MAX(MIN(rejected, matchHint + 1), 1); - resumeProgress(progress); + progress->probeSent = false; return true; } -static void resumeProgress(SSyncRaftProgress* progress) { - progress->paused = false; -} - +/** + * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. + * This is done when a node has rejected recent MsgApps, is currently waiting + * for a snapshot, or has reached the MaxInflightMsgs limit. In normal + * operation, this is false. A throttled node will be contacted less frequently + * until it has reached a state in which it's able to accept a steady stream of + * log entries again. + **/ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { switch (progress->state) { - case PROGRESS_PROBE: - return progress->paused; - case PROGRESS_REPLICATE: - return syncRaftInflightFull(&progress->inflights); - case PROGRESS_SNAPSHOT: + case PROGRESS_STATE_PROBE: + return progress->probeSent; + case PROGRESS_STATE_REPLICATE: + return syncRaftInflightFull(progress->inflights); + case PROGRESS_STATE_SNAPSHOT: return true; default: syncFatal("error sync state:%d", progress->state); } } -void syncRaftProgressFailure(SSyncRaftProgress* progress) { - progress->pendingSnapshotIndex = 0; -} - -bool syncRaftProgressNeedAbortSnapshot(SSyncRaftProgress* progress) { - return progress->state == PROGRESS_SNAPSHOT && progress->matchIndex >= progress->pendingSnapshotIndex; -} - bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; } +/** + * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, + * optionally and if larger, the index of the pending snapshot. + **/ void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { /** * If the original state is ProgressStateSnapshot, progress knows that * the pending snapshot has been sent to this peer successfully, then * probes from pendingSnapshot + 1. **/ - if (progress->state == PROGRESS_SNAPSHOT) { + if (progress->state == PROGRESS_STATE_SNAPSHOT) { SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; - resetProgressState(progress, PROGRESS_PROBE); + resetProgressState(progress, PROGRESS_STATE_PROBE); progress->nextIndex = MAX(progress->matchIndex + 1, pendingSnapshotIndex + 1); } else { - resetProgressState(progress, PROGRESS_PROBE); + resetProgressState(progress, PROGRESS_STATE_PROBE); progress->nextIndex = progress->matchIndex + 1; } } +/** + * syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1. + **/ void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) { - resetProgressState(progress, PROGRESS_REPLICATE); + resetProgressState(progress, PROGRESS_STATE_REPLICATE); progress->nextIndex = progress->matchIndex + 1; } void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) { - resetProgressState(progress, PROGRESS_SNAPSHOT); + resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); progress->pendingSnapshotIndex = snapshotIndex; } -int syncRaftInflightReset(SSyncRaftInflights* inflights) { - inflights->count = 0; - inflights->start = 0; - - return 0; -} - -bool syncRaftInflightFull(SSyncRaftInflights* inflights) { - return inflights->count == inflights->size; -} - -void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) { - assert(!syncRaftInflightFull(inflights)); - - int next = inflights->start + inflights->count; - int size = inflights->size; - /* is next wrapped around buffer? */ - if (next >= size) { - next -= size; - } - - inflights->buffer[next] = inflightIndex; - inflights->count++; -} - -void syncRaftInflightFreeTo(SSyncRaftInflights* inflights, SyncIndex toIndex) { - if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) { - return; - } - - int i, idx; - for (i = 0, idx = inflights->start; i < inflights->count; i++) { - if (toIndex < inflights->buffer[idx]) { - break; - } - - int size = inflights->size; - idx++; - if (idx >= size) { - idx -= size; - } - } - - inflights->count -= i; - inflights->start = idx; - assert(inflights->count >= 0); - if (inflights->count == 0) { - inflights->start = 0; - } -} - -void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) { - syncRaftInflightFreeTo(inflights, inflights->buffer[inflights->start]); -} - +/** + * ResetState moves the Progress into the specified State, resetting ProbeSent, + * PendingSnapshot, and Inflights. + **/ static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) { - progress->paused = false; + progress->probeSent = false; progress->pendingSnapshotIndex = 0; progress->state = state; - syncRaftInflightReset(&(progress->inflights)); + syncRaftInflightReset(progress->inflights); } - +/** + * probeAcked is called when this peer has accepted an append. It resets + * ProbeSent to signal that additional append messages should be sent without + * further delay. + **/ +static void probeAcked(SSyncRaftProgress* progress) { + progress->probeSent = false; +} #if 0 @@ -250,33 +202,33 @@ bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) { void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) { SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - resetProgressState(progress, PROGRESS_SNAPSHOT); + resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log); } void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - if (progress->state == PROGRESS_SNAPSHOT) { + if (progress->state == PROGRESS_STATE_SNAPSHOT) { assert(progress->pendingSnapshotIndex > 0); SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; - resetProgressState(progress, PROGRESS_PROBE); + resetProgressState(progress, PROGRESS_STATE_PROBE); progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex); } else { - resetProgressState(progress, PROGRESS_PROBE); + resetProgressState(progress, PROGRESS_STATE_PROBE); progress->nextIndex = progress->matchIndex + 1; } } void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) { - resetProgressState(pRaft->leaderState.progress, PROGRESS_REPLICATE); + resetProgressState(pRaft->leaderState.progress, PROGRESS_STATE_REPLICATE); pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1; } void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) { SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); progress->pendingSnapshotIndex = 0; - progress->state = PROGRESS_PROBE; + progress->state = PROGRESS_STATE_PROBE; } RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) { diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index d349cbb9b2a6a1ef47f3a0041a29dfbf86bc47c4..7104794cbbc42de4c35a85daf9552cc0273b3fcf 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -32,10 +32,6 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp vi int i; for (i = 0; i < TSDB_MAX_REPLICA; ++i) { SSyncRaftProgress* progress = &(tracker->progressMap[i]); - if (progress->id == SYNC_NON_NODE_ID) { - continue; - } - - visit(progress, arg); + visit(i, progress, arg); } } \ No newline at end of file