提交 8ab1eb64 编写于 作者: L lichuang

[TD-10645][raft]<feature>refactor sync interface

上级 25219949
...@@ -119,15 +119,15 @@ typedef struct SStateManager { ...@@ -119,15 +119,15 @@ typedef struct SStateManager {
int32_t (*readServerState)(struct SStateManager* stateMng, char** ppBuffer, int* n); int32_t (*readServerState)(struct SStateManager* stateMng, char** ppBuffer, int* n);
// save serialized cluster state data, buffer will be free by Sync // save serialized cluster state data, buffer will be free by Sync
void (*saveCluster)(struct SStateManager* stateMng, const char* buffer, int n); void (*saveClusterState)(struct SStateManager* stateMng, const char* buffer, int n);
// read serialized cluster state data, buffer will be free by Sync // read serialized cluster state data, buffer will be free by Sync
int32_t (*readCluster)(struct SStateManager* stateMng, char** ppBuffer, int* n); int32_t (*readClusterState)(struct SStateManager* stateMng, char** ppBuffer, int* n);
} SStateManager; } SStateManager;
typedef struct { typedef struct {
SyncGroupId vgId; SyncGroupId vgId;
SyncIndex snapshotIndex; SyncIndex appliedIndex;
SSyncCluster syncCfg; SSyncCluster syncCfg;
SSyncFSM fsm; SSyncFSM fsm;
SSyncLogStore logStore; SSyncLogStore logStore;
......
...@@ -51,6 +51,8 @@ SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog); ...@@ -51,6 +51,8 @@ SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog);
SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog); SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog);
void syncRaftLogAppliedTo(SSyncRaftLog* pLog, SyncIndex appliedIndex);
bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term); bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term);
int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog); int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog);
......
...@@ -20,6 +20,11 @@ ...@@ -20,6 +20,11 @@
#include "syncInt.h" #include "syncInt.h"
#include "sync_type.h" #include "sync_type.h"
int syncRaftReplicate(SSyncRaft* pRaft, int i); // syncRaftReplicate sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty);
#endif /* TD_SYNC_RAFT_REPLICATION_H */ #endif /* TD_SYNC_RAFT_REPLICATION_H */
...@@ -35,8 +35,20 @@ bool syncRaftIsPromotable(SSyncRaft* pRaft); ...@@ -35,8 +35,20 @@ bool syncRaftIsPromotable(SSyncRaft* pRaft);
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft);
int syncRaftQuorum(SSyncRaft* pRaft); int syncRaftQuorum(SSyncRaft* pRaft);
bool syncRaftMaybeCommit(SSyncRaft* pRaft);
ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
bool preVote, bool accept, bool preVote, bool accept,
int* rejectNum, int *granted); int* rejectNum, int *granted);
static FORCE_INLINE bool syncRaftIsEmptyServerState(const SSyncServerState* serverState) {
return serverState->commitIndex == 0 &&
serverState->term == SYNC_NON_TERM &&
serverState->voteFor == SYNC_NON_NODE_ID;
}
void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState);
void syncRaftBroadcastAppend(SSyncRaft* pRaft);
#endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */ #endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */
...@@ -58,11 +58,20 @@ typedef enum ESyncRaftProgressState { ...@@ -58,11 +58,20 @@ typedef enum ESyncRaftProgressState {
PROGRESS_STATE_SNAPSHOT, PROGRESS_STATE_SNAPSHOT,
} ESyncRaftProgressState; } ESyncRaftProgressState;
static const char* kProgressStateString[] = {
"Probe",
"Replicate",
"Snapshot",
};
/** /**
* Progress represents a follower’s progress in the view of the leader. Leader maintains * Progress represents a follower’s progress in the view of the leader. Leader maintains
* progresses of all followers, and sends entries to the follower based on its progress. * progresses of all followers, and sends entries to the follower based on its progress.
**/ **/
struct SSyncRaftProgress { struct SSyncRaftProgress {
// index in raft cluster config
int selfIndex;
SyncNodeId id; SyncNodeId id;
SyncIndex nextIndex; SyncIndex nextIndex;
...@@ -133,6 +142,11 @@ struct SSyncRaftProgressMap { ...@@ -133,6 +142,11 @@ struct SSyncRaftProgressMap {
SSyncRaftProgress progress[TSDB_MAX_REPLICA]; SSyncRaftProgress progress[TSDB_MAX_REPLICA];
}; };
static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgress* progress) {
return kProgressStateString[progress->state];
}
void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress); void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress);
/** /**
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "sync_raft_quorum.h" #include "sync_raft_quorum.h"
#include "sync_raft_quorum_joint.h" #include "sync_raft_quorum_joint.h"
#include "sync_raft_progress.h" #include "sync_raft_progress.h"
#include "sync_raft_proto.h"
struct SSyncRaftProgressTrackerConfig { struct SSyncRaftProgressTrackerConfig {
SSyncRaftQuorumJointConfig voters; SSyncRaftQuorumJointConfig voters;
...@@ -109,4 +110,6 @@ int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaf ...@@ -109,4 +110,6 @@ int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaf
**/ **/
ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted);
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs);
#endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ #endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
...@@ -25,8 +25,8 @@ ...@@ -25,8 +25,8 @@
* majority configurations. Decisions require the support of both majorities. * majority configurations. Decisions require the support of both majorities.
**/ **/
typedef struct SSyncRaftQuorumJointConfig { typedef struct SSyncRaftQuorumJointConfig {
SSyncCluster outgoing; SSyncRaftNodeMap outgoing;
SSyncCluster incoming; SSyncRaftNodeMap incoming;
} SSyncRaftQuorumJointConfig; } SSyncRaftQuorumJointConfig;
/** /**
......
...@@ -86,7 +86,7 @@ typedef enum { ...@@ -86,7 +86,7 @@ typedef enum {
// grant the vote request // grant the vote request
SYNC_RAFT_VOTE_RESP_GRANT = 1, SYNC_RAFT_VOTE_RESP_GRANT = 1,
//reject the vote request // reject the vote request
SYNC_RAFT_VOTE_RESP_REJECT = 2, SYNC_RAFT_VOTE_RESP_REJECT = 2,
} ESyncRaftVoteType; } ESyncRaftVoteType;
......
...@@ -16,14 +16,19 @@ ...@@ -16,14 +16,19 @@
#include "raft.h" #include "raft.h"
#include "raft_configuration.h" #include "raft_configuration.h"
#include "raft_log.h" #include "raft_log.h"
#include "sync_raft_restore.h"
#include "raft_replication.h" #include "raft_replication.h"
#include "sync_raft_config_change.h"
#include "sync_raft_progress_tracker.h" #include "sync_raft_progress_tracker.h"
#include "syncInt.h" #include "syncInt.h"
#define RAFT_READ_LOG_MAX_NUM 100 #define RAFT_READ_LOG_MAX_NUM 100
static int deserializeServerStateFromBuffer(SSyncServerState* server, const char* buffer, int n); static int deserializeServerStateFromBuffer(SSyncServerState* server, const char* buffer, int n);
static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n); static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const char* buffer, int n);
static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config,
const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs);
static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
...@@ -32,14 +37,15 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); ...@@ -32,14 +37,15 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
SSyncNode* pNode = pRaft->pNode; SSyncNode* pNode = pRaft->pNode;
SSyncServerState serverState; SSyncServerState serverState;
SSyncConfigState confState;
SStateManager* stateManager; SStateManager* stateManager;
SSyncLogStore* logStore; SSyncLogStore* logStore;
SSyncFSM* fsm; SSyncFSM* fsm;
SyncIndex initIndex = pInfo->snapshotIndex;
SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM]; SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM];
int nBuf, limit, i; int nBuf, limit, i;
char* buf; char* buf;
int n; int n;
SSyncRaftChanger changer;
memset(pRaft, 0, sizeof(SSyncRaft)); memset(pRaft, 0, sizeof(SSyncRaft));
...@@ -70,8 +76,45 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -70,8 +76,45 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
syncError("deserializeServerStateFromBuffer for vgid %d fail", pInfo->vgId); syncError("deserializeServerStateFromBuffer for vgid %d fail", pInfo->vgId);
return -1; return -1;
} }
free(buf);
//assert(initIndex <= serverState.commitIndex);
// read config state
if (stateManager->readClusterState(stateManager, &buf, &n) != 0) {
syncError("readClusterState for vgid %d fail", pInfo->vgId);
return -1;
}
if (deserializeClusterStateFromBuffer(&confState, buf, n) != 0) {
syncError("deserializeClusterStateFromBuffer for vgid %d fail", pInfo->vgId);
return -1;
}
free(buf);
changer = (SSyncRaftChanger) {
.tracker = pRaft->tracker,
.lastIndex = syncRaftLogLastIndex(pRaft->log),
};
if (syncRaftRestoreConfig(&changer, &confState) < 0) {
syncError("syncRaftRestoreConfig for vgid %d fail", pInfo->vgId);
return -1;
}
if (!syncRaftIsEmptyServerState(&serverState)) {
syncRaftLoadState(pRaft, &serverState);
}
if (pInfo->appliedIndex > 0) {
syncRaftLogAppliedTo(pRaft->log, pInfo->appliedIndex);
}
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
#if 0
assert(initIndex <= serverState.commitIndex);
// restore fsm state from snapshot index + 1 until commitIndex // restore fsm state from snapshot index + 1 until commitIndex
++initIndex; ++initIndex;
...@@ -96,6 +139,7 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -96,6 +139,7 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
pRaft->selfIndex = pRaft->cluster.selfIndex; pRaft->selfIndex = pRaft->cluster.selfIndex;
#endif
syncInfo("[%d:%d] restore vgid %d state: snapshot index success", syncInfo("[%d:%d] restore vgid %d state: snapshot index success",
pRaft->selfGroupId, pRaft->selfId, pInfo->vgId); pRaft->selfGroupId, pRaft->selfId, pInfo->vgId);
...@@ -133,10 +177,77 @@ static int deserializeServerStateFromBuffer(SSyncServerState* server, const char ...@@ -133,10 +177,77 @@ static int deserializeServerStateFromBuffer(SSyncServerState* server, const char
return 0; return 0;
} }
static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n) { static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const char* buffer, int n) {
return 0; return 0;
} }
static void visitProgressMaybeSendAppend(int i, SSyncRaftProgress* progress, void* arg) {
syncRaftReplicate(arg, progress, false);
}
// switchToConfig reconfigures this node to use the provided configuration. It
// updates the in-memory state and, when necessary, carries out additional
// actions such as reacting to the removal of nodes or changed quorum
// requirements.
//
// The inputs usually result from restoring a ConfState or applying a ConfChange.
static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config,
const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs) {
SyncNodeId selfId = pRaft->selfId;
int i;
bool exist;
SSyncRaftProgress* progress = NULL;
syncRaftConfigState(pRaft->tracker, cs);
i = syncRaftFindProgressIndexByNodeId(&pRaft->tracker->progressMap, selfId);
exist = (i != -1);
// Update whether the node itself is a learner, resetting to false when the
// node is removed.
if (exist) {
progress = &pRaft->tracker->progressMap.progress[i];
pRaft->isLearner = progress->isLearner;
} else {
pRaft->isLearner = false;
}
if ((!exist || pRaft->isLearner) && pRaft->state == TAOS_SYNC_STATE_LEADER) {
// This node is leader and was removed or demoted. We prevent demotions
// at the time writing but hypothetically we handle them the same way as
// removing the leader: stepping down into the next Term.
//
// TODO(tbg): step down (for sanity) and ask follower with largest Match
// to TimeoutNow (to avoid interruption). This might still drop some
// proposals but it's better than nothing.
//
// TODO(tbg): test this branch. It is untested at the time of writing.
return;
}
// The remaining steps only make sense if this node is the leader and there
// are other nodes.
if (pRaft->state != TAOS_SYNC_STATE_LEADER || cs->voters.replica == 0) {
return;
}
if (syncRaftMaybeCommit(pRaft)) {
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
syncRaftBroadcastAppend(pRaft);
} else {
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
syncRaftProgressVisit(pRaft->tracker, visitProgressMaybeSendAppend, pRaft);
// If the the leadTransferee was removed or demoted, abort the leadership transfer.
SyncNodeId leadTransferee = pRaft->leadTransferee;
if (leadTransferee != SYNC_NON_NODE_ID) {
}
}
}
/** /**
* pre-handle message, return true means no need to continue * pre-handle message, return true means no need to continue
* Handle the message term, which may result in our stepping down to a follower. * Handle the message term, which may result in our stepping down to a follower.
......
...@@ -33,7 +33,7 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs ...@@ -33,7 +33,7 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs
return 0; return 0;
} }
RaftMsg_Append_Entries *appendResp = &(pRespMsg->appendResp); RaftMsg_Append_Resp *appendResp = &(pRespMsg->appendResp);
// ignore committed logs // ignore committed logs
if (syncRaftLogIsCommitted(pRaft->log, appendEntries->index)) { if (syncRaftLogIsCommitted(pRaft->log, appendEntries->index)) {
appendResp->index = pRaft->log->commitIndex; appendResp->index = pRaft->log->commitIndex;
......
...@@ -31,6 +31,10 @@ SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) { ...@@ -31,6 +31,10 @@ SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) {
return 0; return 0;
} }
void syncRaftLogAppliedTo(SSyncRaftLog* pLog, SyncIndex appliedIndex) {
}
bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term) { bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term) {
return true; return true;
} }
......
...@@ -16,106 +16,62 @@ ...@@ -16,106 +16,62 @@
#include "raft.h" #include "raft.h"
#include "raft_log.h" #include "raft_log.h"
#include "sync_raft_progress.h" #include "sync_raft_progress.h"
#include "syncInt.h"
#include "raft_replication.h" #include "raft_replication.h"
static int sendSnapshot(SSyncRaft* pRaft, int i); static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress);
static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm term); static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress,
SyncIndex prevIndex, SyncTerm prevTerm,
int syncRaftReplicate(SSyncRaft* pRaft, int i) { const SSyncRaftEntry *entries, int nEntry);
#if 0
// syncRaftReplicate sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) {
assert(pRaft->state == TAOS_SYNC_STATE_LEADER); assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
assert(i >= 0 && i < pRaft->leaderState.nProgress); SyncNodeId nodeId = progress->id;
SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId;
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
if (syncRaftProgressIsPaused(progress)) { if (syncRaftProgressIsPaused(progress)) {
syncInfo("node %d paused", nodeId); syncInfo("node [%d:%d] paused", pRaft->selfGroupId, nodeId);
return 0; return false;
} }
SyncIndex nextIndex = syncRaftProgressNextIndex(progress); SyncIndex nextIndex = syncRaftProgressNextIndex(progress);
SyncIndex snapshotIndex = syncRaftLogSnapshotIndex(pRaft->log); SSyncRaftEntry *entries;
bool inSnapshot = syncRaftProgressInSnapshot(progress); int nEntry;
SyncIndex prevIndex; SyncIndex prevIndex;
SyncTerm prevTerm; SyncTerm prevTerm;
/** prevIndex = nextIndex - 1;
* From Section 3.5: prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex);
* int ret = syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry);
* When sending an AppendEntries RPC, the leader includes the index and
* term of the entry in its log that immediately precedes the new
* entries. If the follower does not find an entry in its log with the
* same index and term, then it refuses the new entries. The consistency
* check acts as an induction step: the initial empty state of the logs
* satisfies the Log Matching Property, and the consistency check
* preserves the Log Matching Property whenever logs are extended. As a
* result, whenever AppendEntries returns successfully, the leader knows
* that the follower's log is identical to its own log up through the new
* entries (Log Matching Property in Figure 3.2).
**/
if (nextIndex == 1) {
/**
* We're including the very first entry, so prevIndex and prevTerm are
* null. If the first entry is not available anymore, send the last
* snapshot if we're not already sending one.
**/
if (snapshotIndex > 0 && !inSnapshot) {
goto send_snapshot;
}
// otherwise send append entries from start if (nEntry == 0 && !sendIfEmpty) {
prevIndex = 0; return false;
prevTerm = 0;
} else {
/**
* Set prevIndex and prevTerm to the index and term of the entry at
* nextIndex - 1.
**/
prevIndex = nextIndex - 1;
prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex);
/**
* If the entry is not anymore in our log, send the last snapshot if we're
* not doing so already.
**/
if (prevTerm == SYNC_NON_TERM && !inSnapshot) {
goto send_snapshot;
}
} }
/* Send empty AppendEntries RPC when installing a snaphot */ if (ret != 0 || prevTerm == SYNC_NON_TERM) {
if (inSnapshot) { return sendSnapshot(pRaft, progress);
prevIndex = syncRaftLogLastIndex(pRaft->log);
prevTerm = syncRaftLogLastTerm(pRaft->log);
} }
return sendAppendEntries(pRaft, i, prevIndex, prevTerm); return sendAppendEntries(pRaft, progress, prevIndex, prevTerm, entries, nEntry);
send_snapshot:
if (syncRaftProgressRecentActive(progress)) {
/* Only send a snapshot when we have heard from the server */
return sendSnapshot(pRaft, i);
} else {
/* Send empty AppendEntries RPC when we haven't heard from the server */
prevIndex = syncRaftLogLastIndex(pRaft->log);
prevTerm = syncRaftLogLastTerm(pRaft->log);
return sendAppendEntries(pRaft, i, prevIndex, prevTerm);
}
#endif
return 0;
} }
static int sendSnapshot(SSyncRaft* pRaft, int i) { static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
return 0; if (!syncRaftProgressRecentActive(progress)) {
return false;
}
return true;
} }
static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncTerm prevTerm) { static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress,
#if 0 SyncIndex prevIndex, SyncTerm prevTerm,
SyncIndex nextIndex = prevIndex + 1; const SSyncRaftEntry *entries, int nEntry) {
SSyncRaftEntry *entries; SyncIndex lastIndex;
int nEntry; SyncTerm logTerm = prevTerm;
SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[i]); SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[progress->selfIndex]);
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry);
SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term, SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term,
prevIndex, prevTerm, pRaft->log->commitIndex, prevIndex, prevTerm, pRaft->log->commitIndex,
...@@ -125,24 +81,27 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT ...@@ -125,24 +81,27 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT
goto err_release_log; goto err_release_log;
} }
pRaft->io.send(msg, pNode); if (nEntry != 0) {
switch (progress->state) {
if (syncRaftProgressInReplicate(progress)) { // optimistically increase the next when in StateReplicate
SyncIndex lastIndex = nextIndex + nEntry; case PROGRESS_STATE_REPLICATE:
syncRaftProgressOptimisticNextIndex(progress, lastIndex); lastIndex = entries[nEntry - 1].index;
syncRaftInflightAdd(&progress->inflights, lastIndex); syncRaftProgressOptimisticNextIndex(progress, lastIndex);
} else if (syncRaftProgressInProbe(progress)) { syncRaftInflightAdd(&progress->inflights, lastIndex);
syncRaftProgressPause(progress); break;
} else { case PROGRESS_STATE_PROBE:
progress->probeSent = true;
break;
default:
syncFatal("[%d:%d] is sending append in unhandled state %s",
pRaft->selfGroupId, pRaft->selfId, syncRaftProgressStateString(progress));
break;
}
} }
pRaft->io.send(msg, pNode);
syncRaftProgressUpdateSendTick(progress, pRaft->currentTick); return true;
return 0;
err_release_log: err_release_log:
syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry); syncRaftLogRelease(pRaft->log, prevIndex + 1, entries, nEntry);
#endif return false;
return 0; }
}
\ No newline at end of file
...@@ -31,7 +31,6 @@ static void tickElection(SSyncRaft* pRaft); ...@@ -31,7 +31,6 @@ static void tickElection(SSyncRaft* pRaft);
static void tickHeartbeat(SSyncRaft* pRaft); static void tickHeartbeat(SSyncRaft* pRaft);
static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n); static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n);
static bool maybeCommit(SSyncRaft* pRaft);
static void abortLeaderTransfer(SSyncRaft* pRaft); static void abortLeaderTransfer(SSyncRaft* pRaft);
...@@ -171,6 +170,25 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, ...@@ -171,6 +170,25 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
return granted; return granted;
*/ */
void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) {
SyncIndex commitIndex = serverState->commitIndex;
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
if (commitIndex < pRaft->log->commitIndex || commitIndex > lastIndex) {
syncFatal("[%d:%d] state.commit %"PRId64" is out of range [%" PRId64 ",%" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, commitIndex, pRaft->log->commitIndex, lastIndex);
return;
}
pRaft->log->commitIndex = commitIndex;
pRaft->term = serverState->term;
pRaft->voteFor = serverState->voteFor;
}
void syncRaftBroadcastAppend(SSyncRaft* pRaft) {
}
static int convertClear(SSyncRaft* pRaft) { static int convertClear(SSyncRaft* pRaft) {
} }
...@@ -245,16 +263,14 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { ...@@ -245,16 +263,14 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->cluster.selfIndex]); SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->cluster.selfIndex]);
syncRaftProgressMaybeUpdate(progress, lastIndex); syncRaftProgressMaybeUpdate(progress, lastIndex);
// Regardless of maybeCommit's return, our caller will call bcastAppend. // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend.
maybeCommit(pRaft); syncRaftMaybeCommit(pRaft);
} }
/** // syncRaftMaybeCommit attempts to advance the commit index. Returns true if
* maybeCommit attempts to advance the commit index. Returns true if // the commit index changed (in which case the caller should call
* the commit index changed (in which case the caller should call // r.bcastAppend).
* r.bcastAppend). bool syncRaftMaybeCommit(SSyncRaft* pRaft) {
**/
static bool maybeCommit(SSyncRaft* pRaft) {
return true; return true;
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "sync_raft_progress_tracker.h" #include "sync_raft_progress_tracker.h"
#include "sync_raft_proto.h"
SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { SSyncRaftProgressTracker* syncRaftOpenProgressTracker() {
SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker)); SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker));
...@@ -77,4 +78,11 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r ...@@ -77,4 +78,11 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r
if (rejected) *rejected = r; if (rejected) *rejected = r;
if (granted) *granted = g; if (granted) *granted = g;
return syncRaftVoteResult(&(tracker->config.voters), tracker->votes); return syncRaftVoteResult(&(tracker->config.voters), tracker->votes);
}
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) {
memcpy(&cs->voters, &tracker->config.voters.incoming, sizeof(SSyncRaftNodeMap));
memcpy(&cs->votersOutgoing, &tracker->config.voters.outgoing, sizeof(SSyncRaftNodeMap));
memcpy(&cs->learners, &tracker->config.learners, sizeof(SSyncRaftNodeMap));
memcpy(&cs->learnersNext, &tracker->config.learnersNext, sizeof(SSyncRaftNodeMap));
} }
\ No newline at end of file
...@@ -48,7 +48,7 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) ...@@ -48,7 +48,7 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs)
.n = 1, .n = 1,
.changes = &incoming.changes[i], .changes = &incoming.changes[i],
}; };
ret = syncRaftChangerSimpleConfig(changer, &css, &config, &progressMap); ret = syncRaftChangerSimpleConfig(changer, &css, config, progressMap);
if (ret != 0) { if (ret != 0) {
goto out; goto out;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册