提交 e17f573e 编写于 作者: L lichuang

[TD-10645][raft]<feature>add raft progress tracker

上级 ccf8f14f
...@@ -44,6 +44,7 @@ struct SSyncRaft { ...@@ -44,6 +44,7 @@ struct SSyncRaft {
SSyncCluster cluster; SSyncCluster cluster;
int selfIndex;
SyncNodeId selfId; SyncNodeId selfId;
SyncGroupId selfGroupId; SyncGroupId selfGroupId;
...@@ -113,9 +114,6 @@ struct SSyncRaft { ...@@ -113,9 +114,6 @@ struct SSyncRaft {
**/ **/
uint16_t heartbeatElapsed; uint16_t heartbeatElapsed;
// current tick count since start up
uint32_t currentTick;
bool preVote; bool preVote;
bool checkQuorum; bool checkQuorum;
...@@ -130,6 +128,9 @@ struct SSyncRaft { ...@@ -130,6 +128,9 @@ struct SSyncRaft {
int randomizedElectionTimeout; int randomizedElectionTimeout;
bool disableProposalForwarding; bool disableProposalForwarding;
// current tick count since start up
uint32_t currentTick;
SyncRaftStepFp stepFp; SyncRaftStepFp stepFp;
SyncRaftTickFp tickFp; SyncRaftTickFp tickFp;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http: *www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_INFLIGHTS_H
#define TD_SYNC_RAFT_INFLIGHTS_H
#include "sync.h"
/**
* SSyncRaftInflights limits the number of MsgApp (represented by the largest index
* contained within) sent to followers but not yet acknowledged by them. Callers
* use syncRaftInflightFull() to check whether more messages can be sent,
* call syncRaftInflightAdd() whenever they are sending a new append,
* and release "quota" via FreeLE() whenever an ack is received.
**/
typedef struct SSyncRaftInflights {
/* the starting index in the buffer */
int start;
/* number of inflights in the buffer */
int count;
/* the size of the buffer */
int size;
/**
* buffer contains the index of the last entry
* inside one message.
**/
SyncIndex* buffer;
} SSyncRaftInflights;
SSyncRaftInflights* syncRaftOpenInflights(int size);
void syncRaftCloseInflights(SSyncRaftInflights*);
static FORCE_INLINE void syncRaftInflightReset(SSyncRaftInflights* inflights) {
inflights->count = 0;
inflights->start = 0;
}
static FORCE_INLINE bool syncRaftInflightFull(SSyncRaftInflights* inflights) {
return inflights->count == inflights->size;
}
/**
* syncRaftInflightAdd notifies the Inflights that a new message with the given index is being
* dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd()
* to verify that there is room for one more message,
* and consecutive calls to add syncRaftInflightAdd() must provide a
* monotonic sequence of indexes.
**/
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex);
/**
* syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight.
**/
void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex);
/**
* syncRaftInflightFreeFirstOne releases the first inflight.
* This is a no-op if nothing is inflight.
**/
void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights);
#endif /* TD_SYNC_RAFT_INFLIGHTS_H */
\ No newline at end of file
...@@ -10,62 +10,52 @@ ...@@ -10,62 +10,52 @@
* FITNESS FOR A PARTICULAR PURPOSE. * FITNESS FOR A PARTICULAR PURPOSE.
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http: *www.gnu.org/licenses/>.
*/ */
#ifndef TD_SYNC_RAFT_PROGRESS_H #ifndef TD_SYNC_RAFT_PROGRESS_H
#define TD_SYNC_RAFT_PROGRESS_H #define TD_SYNC_RAFT_PROGRESS_H
#include "sync_type.h" #include "sync_type.h"
#include "sync_raft_inflights.h"
/**
* SSyncRaftInflights is a sliding window for the inflight messages.
* Thus inflight effectively limits both the number of inflight messages
* and the bandwidth each Progress can use.
* When inflights is full, no more message should be sent.
* When a leader sends out a message, the index of the last
* entry should be added to inflights. The index MUST be added
* into inflights in order.
* When a leader receives a reply, the previous inflights should
* be freed by calling syncRaftInflightFreeTo with the index of the last
* received entry.
**/
typedef struct SSyncRaftInflights {
/* the starting index in the buffer */
int start;
/* number of inflights in the buffer */
int count;
/* the size of the buffer */
int size;
/**
* buffer contains the index of the last entry
* inside one message.
**/
SyncIndex* buffer;
} SSyncRaftInflights;
/** /**
* State defines how the leader should interact with the follower. * State defines how the leader should interact with the follower.
* *
* When in PROGRESS_PROBE, leader sends at most one replication message * When in PROGRESS_STATE_PROBE, leader sends at most one replication message
* per heartbeat interval. It also probes actual progress of the follower. * per heartbeat interval. It also probes actual progress of the follower.
* *
* When in PROGRESS_REPLICATE, leader optimistically increases next * When in PROGRESS_STATE_REPLICATE, leader optimistically increases next
* to the latest entry sent after sending replication message. This is * to the latest entry sent after sending replication message. This is
* an optimized state for fast replicating log entries to the follower. * an optimized state for fast replicating log entries to the follower.
* *
* When in PROGRESS_SNAPSHOT, leader should have sent out snapshot * When in PROGRESS_STATE_SNAPSHOT, leader should have sent out snapshot
* before and stops sending any replication message. * before and stops sending any replication message.
* *
* PROGRESS_PROBE is the initial state. * PROGRESS_STATE_PROBE is the initial state.
**/ **/
typedef enum RaftProgressState { typedef enum RaftProgressState {
PROGRESS_PROBE = 0, /**
PROGRESS_REPLICATE, * StateProbe indicates a follower whose last index isn't known. Such a
PROGRESS_SNAPSHOT, * follower is "probed" (i.e. an append sent periodically) to narrow down
* its last index. In the ideal (and common) case, only one round of probing
* is necessary as the follower will react with a hint. Followers that are
* probed over extended periods of time are often offline.
**/
PROGRESS_STATE_PROBE = 0,
/**
* StateReplicate is the state steady in which a follower eagerly receives
* log entries to append to its log.
**/
PROGRESS_STATE_REPLICATE,
/**
* StateSnapshot indicates a follower that needs log entries not available
* from the leader's Raft log. Such a follower needs a full snapshot to
* return to StateReplicate.
**/
PROGRESS_STATE_SNAPSHOT,
} RaftProgressState; } RaftProgressState;
/** /**
...@@ -73,25 +63,27 @@ typedef enum RaftProgressState { ...@@ -73,25 +63,27 @@ typedef enum RaftProgressState {
* progresses of all followers, and sends entries to the follower based on its progress. * progresses of all followers, and sends entries to the follower based on its progress.
**/ **/
struct SSyncRaftProgress { struct SSyncRaftProgress {
SyncNodeId id;
SyncIndex nextIndex; SyncIndex nextIndex;
SyncIndex matchIndex; SyncIndex matchIndex;
/**
* State defines how the leader should interact with the follower.
*
* When in StateProbe, leader sends at most one replication message
* per heartbeat interval. It also probes actual progress of the follower.
*
* When in StateReplicate, leader optimistically increases next
* to the latest entry sent after sending replication message. This is
* an optimized state for fast replicating log entries to the follower.
*
* When in StateSnapshot, leader should have sent out snapshot
* before and stops sending any replication message.
**/
RaftProgressState state; RaftProgressState state;
/**
* paused is used in PROGRESS_PROBE.
* When paused is true, raft should pause sending replication message to this peer.
**/
bool paused;
// last send append message tick
uint32_t lastSendTick;
/** /**
* pendingSnapshotIndex is used in PROGRESS_SNAPSHOT. * pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT.
* If there is a pending snapshot, the pendingSnapshotIndex will be set to the * If there is a pending snapshot, the pendingSnapshotIndex will be set to the
* index of the snapshot. If pendingSnapshotIndex is set, the replication process of * index of the snapshot. If pendingSnapshotIndex is set, the replication process of
* this Progress will be paused. raft will not resend snapshot until the pending one * this Progress will be paused. raft will not resend snapshot until the pending one
...@@ -107,90 +99,116 @@ struct SSyncRaftProgress { ...@@ -107,90 +99,116 @@ struct SSyncRaftProgress {
bool recentActive; bool recentActive;
/** /**
* flow control sliding window * probeSent is used while this follower is in StateProbe. When probeSent is
**/ * true, raft should pause sending replication message to this peer until
SSyncRaftInflights inflights; * probeSent is reset. See ProbeAcked() and IsPaused().
**/
bool probeSent;
/**
* inflights is a sliding window for the inflight messages.
* Each inflight message contains one or more log entries.
* The max number of entries per message is defined in raft config as MaxSizePerMsg.
* Thus inflight effectively limits both the number of inflight messages
* and the bandwidth each Progress can use.
* When inflights is Full, no more message should be sent.
* When a leader sends out a message, the index of the last
* entry should be added to inflights. The index MUST be added
* into inflights in order.
* When a leader receives a reply, the previous inflights should
* be freed by calling inflights.FreeLE with the index of the last
* received entry.
**/
SSyncRaftInflights* inflights;
// IsLearner is true if this progress is tracked for a learner. /**
* IsLearner is true if this progress is tracked for a learner.
**/
bool isLearner; bool isLearner;
}; };
void syncRaftInitProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress); void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress);
/**
* syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
* optionally and if larger, the index of the pending snapshot.
**/
void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress);
/**
* syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
**/
void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress);
/** /**
* syncRaftProgressMaybeUpdate returns false if the given lastIndex index comes from i-th node's log. * syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the
* Otherwise it updates the progress and returns true. * index acked by it. The method returns false if the given n index comes from
* an outdated message. Otherwise it updates the progress and returns true.
**/ **/
bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex); bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex);
/**
* syncRaftProgressOptimisticNextIndex signals that appends all the way up to and including index n
* are in-flight. As a result, Next is increased to n+1.
**/
static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) {
progress->nextIndex = nextIndex + 1; progress->nextIndex = nextIndex + 1;
} }
/** /**
* syncRaftProgressMaybeDecrTo returns false if the given to index comes from an out of order message. * syncRaftProgressMaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
* Otherwise it decreases the progress next index to min(rejected, last) and returns true. * arguments are the index of the append message rejected by the follower, and
**/ * the hint that we want to decrease to.
*
* Rejections can happen spuriously as messages are sent out of order or
* duplicated. In such cases, the rejection pertains to an index that the
* Progress already knows were previously acknowledged, and false is returned
* without changing the Progress.
*
* If the rejection is genuine, Next is lowered sensibly, and the Progress is
* cleared for sending log entries.
**/
bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
SyncIndex rejected, SyncIndex lastIndex); SyncIndex rejected, SyncIndex matchHint);
/** /**
* syncRaftProgressIsPaused returns whether sending log entries to this node has been * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled.
* paused. A node may be paused because it has rejected recent * This is done when a node has rejected recent MsgApps, is currently waiting
* MsgApps, is currently waiting for a snapshot, or has reached the * for a snapshot, or has reached the MaxInflightMsgs limit. In normal
* MaxInflightMsgs limit. * operation, this is false. A throttled node will be contacted less frequently
* until it has reached a state in which it's able to accept a steady stream of
* log entries again.
**/ **/
bool syncRaftProgressIsPaused(SSyncRaftProgress* progress); bool syncRaftProgressIsPaused(SSyncRaftProgress* progress);
static FORCE_INLINE void syncRaftProgressPause(SSyncRaftProgress* progress) {
progress->paused = true;
}
static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) { static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) {
return progress->nextIndex; return progress->nextIndex;
} }
static FORCE_INLINE RaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) { static FORCE_INLINE RaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) {
return progress->state == PROGRESS_REPLICATE; return progress->state == PROGRESS_STATE_REPLICATE;
} }
static FORCE_INLINE RaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) { static FORCE_INLINE RaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) {
return progress->state == PROGRESS_SNAPSHOT; return progress->state == PROGRESS_STATE_SNAPSHOT;
} }
static FORCE_INLINE RaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) { static FORCE_INLINE RaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) {
return progress->state == PROGRESS_PROBE; return progress->state == PROGRESS_STATE_PROBE;
} }
static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progress) { static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progress) {
return progress->recentActive; return progress->recentActive;
} }
static FORCE_INLINE bool syncRaftProgressUpdateSendTick(SSyncRaftProgress* progress, SyncTick current) {
return progress->lastSendTick = current;
}
void syncRaftProgressFailure(SSyncRaftProgress* progress);
bool syncRaftProgressNeedAbortSnapshot(SSyncRaftProgress* progress);
/** /**
* return true if progress's log is up-todate * return true if progress's log is up-todate
**/ **/
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress);
void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress);
void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress);
void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex);
/* inflights APIs */
int syncRaftInflightReset(SSyncRaftInflights* inflights);
bool syncRaftInflightFull(SSyncRaftInflights* inflights);
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex);
void syncRaftInflightFreeTo(SSyncRaftInflights* inflights, SyncIndex toIndex);
void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights);
#if 0 #if 0
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "sync_type.h" #include "sync_type.h"
#include "sync_raft_quorum_joint.h" #include "sync_raft_quorum_joint.h"
#include "raft_progress.h" #include "sync_raft_progress.h"
struct SSyncRaftProgressTrackerConfig { struct SSyncRaftProgressTrackerConfig {
SSyncRaftQuorumJointConfig voters; SSyncRaftQuorumJointConfig voters;
...@@ -94,7 +94,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker(); ...@@ -94,7 +94,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker();
void syncRaftResetVotes(SSyncRaftProgressTracker*); void syncRaftResetVotes(SSyncRaftProgressTracker*);
typedef void (*visitProgressFp)(SSyncRaftProgress* progress, void* arg); typedef void (*visitProgressFp)(int i, SSyncRaftProgress* progress, void* arg);
void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg);
#endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ #endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
...@@ -102,6 +102,8 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -102,6 +102,8 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
pRaft->selfIndex = pRaft->cluster.selfIndex;
syncInfo("[%d:%d] restore vgid %d state: snapshot index success", syncInfo("[%d:%d] restore vgid %d state: snapshot index success",
pRaft->selfGroupId, pRaft->selfId, pInfo->vgId); pRaft->selfGroupId, pRaft->selfId, pInfo->vgId);
return 0; return 0;
...@@ -443,8 +445,8 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) { ...@@ -443,8 +445,8 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) {
pRaft->leadTransferee = SYNC_NON_NODE_ID; pRaft->leadTransferee = SYNC_NON_NODE_ID;
} }
static void initProgress(SSyncRaftProgress* progress, void* arg) { static void initProgress(int i, SSyncRaftProgress* progress, void* arg) {
syncRaftInitProgress((SSyncRaft*)arg, progress); syncRaftInitProgress(i, (SSyncRaft*)arg, progress);
} }
static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "raft.h" #include "raft.h"
#include "raft_log.h" #include "raft_log.h"
#include "raft_progress.h" #include "sync_raft_progress.h"
#include "raft_replication.h" #include "raft_replication.h"
static int sendSnapshot(SSyncRaft* pRaft, int i); static int sendSnapshot(SSyncRaft* pRaft, int i);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http: *www.gnu.org/licenses/>.
*/
#include "sync_raft_inflights.h"
SSyncRaftInflights* syncRaftOpenInflights(int size) {
SSyncRaftInflights* inflights = (SSyncRaftInflights*)malloc(sizeof(SSyncRaftInflights));
if (inflights == NULL) {
return NULL;
}
SyncIndex* buffer = (SyncIndex*)malloc(sizeof(SyncIndex) * size);
if (buffer == NULL) {
free(inflights);
return NULL;
}
*inflights = (SSyncRaftInflights) {
.buffer = buffer,
.count = 0,
.size = 0,
.start = 0,
};
return inflights;
}
void syncRaftCloseInflights(SSyncRaftInflights* inflights) {
free(inflights->buffer);
free(inflights);
}
/**
* syncRaftInflightAdd notifies the Inflights that a new message with the given index is being
* dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd()
* to verify that there is room for one more message,
* and consecutive calls to add syncRaftInflightAdd() must provide a
* monotonic sequence of indexes.
**/
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) {
assert(!syncRaftInflightFull(inflights));
int next = inflights->start + inflights->count;
int size = inflights->size;
/* is next wrapped around buffer? */
if (next >= size) {
next -= size;
}
inflights->buffer[next] = inflightIndex;
inflights->count++;
}
/**
* syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight.
**/
void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) {
if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) {
/* out of the left side of the window */
return;
}
int i, idx;
for (i = 0, idx = inflights->start; i < inflights->count; i++) {
if (toIndex < inflights->buffer[idx]) { // found the first large inflight
break;
}
// increase index and maybe rotate
int size = inflights->size;
idx++;
if (idx >= size) {
idx -= size;
}
}
// free i inflights and set new start index
inflights->count -= i;
inflights->start = idx;
assert(inflights->count >= 0);
if (inflights->count == 0) {
// inflights is empty, reset the start index so that we don't grow the
// buffer unnecessarily.
inflights->start = 0;
}
}
/**
* syncRaftInflightFreeFirstOne releases the first inflight.
* This is a no-op if nothing is inflight.
**/
void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) {
syncRaftInflightFreeLE(inflights, inflights->buffer[inflights->start]);
}
...@@ -15,57 +15,50 @@ ...@@ -15,57 +15,50 @@
#include "raft.h" #include "raft.h"
#include "raft_log.h" #include "raft_log.h"
#include "raft_progress.h" #include "sync_raft_progress.h"
#include "sync_raft_progress_tracker.h"
#include "sync.h" #include "sync.h"
#include "syncInt.h" #include "syncInt.h"
static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state); static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state);
static void probeAcked(SSyncRaftProgress* progress);
static void resumeProgress(SSyncRaftProgress* progress); static void resumeProgress(SSyncRaftProgress* progress);
int syncRaftProgressCreate(SSyncRaft* pRaft) { void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) {
SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflight);
/* if (inflights == NULL) {
inflights->buffer = (SyncIndex*)malloc(sizeof(SyncIndex) * pRaft->maxInflightMsgs); return;
if (inflights->buffer == NULL) {
return RAFT_OOM;
} }
inflights->size = pRaft->maxInflightMsgs;
*/
}
/*
int syncRaftProgressRecreate(SSyncRaft* pRaft, const RaftConfiguration* configuration) {
}
*/
void syncRaftInitProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
*progress = (SSyncRaftProgress) { *progress = (SSyncRaftProgress) {
.matchIndex = progress->id == pRaft->selfId ? syncRaftLogLastIndex(pRaft->log) : 0, .matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0,
.nextIndex = syncRaftLogLastIndex(pRaft->log) + 1, .nextIndex = syncRaftLogLastIndex(pRaft->log) + 1,
//.inflights = .inflights = inflights,
}; };
} }
/**
* syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the
* index acked by it. The method returns false if the given n index comes from
* an outdated message. Otherwise it updates the progress and returns true.
**/
bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) { bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) {
bool updated = false; bool updated = false;
if (progress->matchIndex < lastIndex) { if (progress->matchIndex < lastIndex) {
progress->matchIndex = lastIndex; progress->matchIndex = lastIndex;
updated = true; updated = true;
resumeProgress(progress); probeAcked(progress);
}
if (progress->nextIndex < lastIndex + 1) {
progress->nextIndex = lastIndex + 1;
} }
progress->nextIndex = MAX(progress->nextIndex, lastIndex + 1);
return updated; return updated;
} }
bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
SyncIndex rejected, SyncIndex lastIndex) { SyncIndex rejected, SyncIndex matchHint) {
if (progress->state == PROGRESS_REPLICATE) { if (progress->state == PROGRESS_STATE_REPLICATE) {
/** /**
* the rejection must be stale if the progress has matched and "rejected" * the rejection must be stale if the progress has matched and "rejected"
* is smaller than "match". * is smaller than "match".
...@@ -77,143 +70,102 @@ bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, ...@@ -77,143 +70,102 @@ bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
/* directly decrease next to match + 1 */ /* directly decrease next to match + 1 */
progress->nextIndex = progress->matchIndex + 1; progress->nextIndex = progress->matchIndex + 1;
//syncRaftProgressBecomeProbe(raft, i);
return true; return true;
} }
/**
* The rejection must be stale if "rejected" does not match next - 1. This
* is because non-replicating followers are probed one entry at a time.
**/
if (rejected != progress->nextIndex - 1) { if (rejected != progress->nextIndex - 1) {
syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore" syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore"
, rejected, progress->nextIndex); , rejected, progress->nextIndex);
return false; return false;
} }
progress->nextIndex = MIN(rejected, lastIndex + 1); progress->nextIndex = MAX(MIN(rejected, matchHint + 1), 1);
if (progress->nextIndex < 1) {
progress->nextIndex = 1;
}
resumeProgress(progress); progress->probeSent = false;
return true; return true;
} }
static void resumeProgress(SSyncRaftProgress* progress) { /**
progress->paused = false; * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled.
} * This is done when a node has rejected recent MsgApps, is currently waiting
* for a snapshot, or has reached the MaxInflightMsgs limit. In normal
* operation, this is false. A throttled node will be contacted less frequently
* until it has reached a state in which it's able to accept a steady stream of
* log entries again.
**/
bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
switch (progress->state) { switch (progress->state) {
case PROGRESS_PROBE: case PROGRESS_STATE_PROBE:
return progress->paused; return progress->probeSent;
case PROGRESS_REPLICATE: case PROGRESS_STATE_REPLICATE:
return syncRaftInflightFull(&progress->inflights); return syncRaftInflightFull(progress->inflights);
case PROGRESS_SNAPSHOT: case PROGRESS_STATE_SNAPSHOT:
return true; return true;
default: default:
syncFatal("error sync state:%d", progress->state); syncFatal("error sync state:%d", progress->state);
} }
} }
void syncRaftProgressFailure(SSyncRaftProgress* progress) {
progress->pendingSnapshotIndex = 0;
}
bool syncRaftProgressNeedAbortSnapshot(SSyncRaftProgress* progress) {
return progress->state == PROGRESS_SNAPSHOT && progress->matchIndex >= progress->pendingSnapshotIndex;
}
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex;
} }
/**
* syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
* optionally and if larger, the index of the pending snapshot.
**/
void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) {
/** /**
* If the original state is ProgressStateSnapshot, progress knows that * If the original state is ProgressStateSnapshot, progress knows that
* the pending snapshot has been sent to this peer successfully, then * the pending snapshot has been sent to this peer successfully, then
* probes from pendingSnapshot + 1. * probes from pendingSnapshot + 1.
**/ **/
if (progress->state == PROGRESS_SNAPSHOT) { if (progress->state == PROGRESS_STATE_SNAPSHOT) {
SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex;
resetProgressState(progress, PROGRESS_PROBE); resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = MAX(progress->matchIndex + 1, pendingSnapshotIndex + 1); progress->nextIndex = MAX(progress->matchIndex + 1, pendingSnapshotIndex + 1);
} else { } else {
resetProgressState(progress, PROGRESS_PROBE); resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = progress->matchIndex + 1; progress->nextIndex = progress->matchIndex + 1;
} }
} }
/**
* syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
**/
void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) { void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) {
resetProgressState(progress, PROGRESS_REPLICATE); resetProgressState(progress, PROGRESS_STATE_REPLICATE);
progress->nextIndex = progress->matchIndex + 1; progress->nextIndex = progress->matchIndex + 1;
} }
void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) { void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) {
resetProgressState(progress, PROGRESS_SNAPSHOT); resetProgressState(progress, PROGRESS_STATE_SNAPSHOT);
progress->pendingSnapshotIndex = snapshotIndex; progress->pendingSnapshotIndex = snapshotIndex;
} }
int syncRaftInflightReset(SSyncRaftInflights* inflights) { /**
inflights->count = 0; * ResetState moves the Progress into the specified State, resetting ProbeSent,
inflights->start = 0; * PendingSnapshot, and Inflights.
**/
return 0;
}
bool syncRaftInflightFull(SSyncRaftInflights* inflights) {
return inflights->count == inflights->size;
}
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) {
assert(!syncRaftInflightFull(inflights));
int next = inflights->start + inflights->count;
int size = inflights->size;
/* is next wrapped around buffer? */
if (next >= size) {
next -= size;
}
inflights->buffer[next] = inflightIndex;
inflights->count++;
}
void syncRaftInflightFreeTo(SSyncRaftInflights* inflights, SyncIndex toIndex) {
if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) {
return;
}
int i, idx;
for (i = 0, idx = inflights->start; i < inflights->count; i++) {
if (toIndex < inflights->buffer[idx]) {
break;
}
int size = inflights->size;
idx++;
if (idx >= size) {
idx -= size;
}
}
inflights->count -= i;
inflights->start = idx;
assert(inflights->count >= 0);
if (inflights->count == 0) {
inflights->start = 0;
}
}
void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) {
syncRaftInflightFreeTo(inflights, inflights->buffer[inflights->start]);
}
static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) { static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) {
progress->paused = false; progress->probeSent = false;
progress->pendingSnapshotIndex = 0; progress->pendingSnapshotIndex = 0;
progress->state = state; progress->state = state;
syncRaftInflightReset(&(progress->inflights)); syncRaftInflightReset(progress->inflights);
} }
/**
* probeAcked is called when this peer has accepted an append. It resets
* ProbeSent to signal that additional append messages should be sent without
* further delay.
**/
static void probeAcked(SSyncRaftProgress* progress) {
progress->probeSent = false;
}
#if 0 #if 0
...@@ -250,33 +202,33 @@ bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) { ...@@ -250,33 +202,33 @@ bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) {
void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) { void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
resetProgressState(progress, PROGRESS_SNAPSHOT); resetProgressState(progress, PROGRESS_STATE_SNAPSHOT);
progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log); progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log);
} }
void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
if (progress->state == PROGRESS_SNAPSHOT) { if (progress->state == PROGRESS_STATE_SNAPSHOT) {
assert(progress->pendingSnapshotIndex > 0); assert(progress->pendingSnapshotIndex > 0);
SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex;
resetProgressState(progress, PROGRESS_PROBE); resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex); progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex);
} else { } else {
resetProgressState(progress, PROGRESS_PROBE); resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = progress->matchIndex + 1; progress->nextIndex = progress->matchIndex + 1;
} }
} }
void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) { void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) {
resetProgressState(pRaft->leaderState.progress, PROGRESS_REPLICATE); resetProgressState(pRaft->leaderState.progress, PROGRESS_STATE_REPLICATE);
pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1; pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1;
} }
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) { void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
progress->pendingSnapshotIndex = 0; progress->pendingSnapshotIndex = 0;
progress->state = PROGRESS_PROBE; progress->state = PROGRESS_STATE_PROBE;
} }
RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) { RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) {
......
...@@ -32,10 +32,6 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp vi ...@@ -32,10 +32,6 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp vi
int i; int i;
for (i = 0; i < TSDB_MAX_REPLICA; ++i) { for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncRaftProgress* progress = &(tracker->progressMap[i]); SSyncRaftProgress* progress = &(tracker->progressMap[i]);
if (progress->id == SYNC_NON_NODE_ID) { visit(i, progress, arg);
continue;
}
visit(progress, arg);
} }
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册