提交 68e6b82a 编写于 作者: L lichuang

[TD-10645][raft]<feature>add vote resp process

上级 ce654f83
......@@ -20,11 +20,11 @@
#include "syncInt.h"
#include "sync_type.h"
// syncRaftReplicate sends an append RPC with new entries to the given peer,
// syncRaftMaybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty);
bool syncRaftMaybeSendAppend(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty);
#endif /* TD_SYNC_RAFT_REPLICATION_H */
......@@ -28,6 +28,8 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft);
void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType);
void syncRaftCampaign(SSyncRaft* pRaft, ESyncRaftElectionType cType);
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft);
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft);
......
......@@ -169,7 +169,7 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch
}
static void visitProgressMaybeSendAppend(SSyncRaftProgress* progress, void* arg) {
syncRaftReplicate(arg, progress, false);
syncRaftMaybeSendAppend(arg, progress, false);
}
// switchToConfig reconfigures this node to use the provided configuration. It
......
......@@ -48,12 +48,14 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
}
static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (!(pRaft->voteFor == SYNC_NON_NODE_ID || pMsg->term > pRaft->term || pRaft->voteFor == pMsg->from)) {
return false;
}
if (!syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm)) {
return false;
}
return true;
bool canVote =
// We can vote if this is a repeat of a vote we've already cast...
pRaft->voteFor == pMsg->from ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(pRaft->voteFor == SYNC_NON_NODE_ID && pRaft->leaderId == SYNC_NON_NODE_ID) ||
// ...or this is a PreVote for a future term...
(pMsg->vote.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION && pMsg->term > pRaft->term);
// ...and we believe the candidate is up to date.
return canVote && syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm);
}
\ No newline at end of file
......@@ -45,12 +45,14 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (result == SYNC_RAFT_VOTE_WON) {
if (pRaft->candidateState.inPreVote) {
syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION);
syncRaftCampaign(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION);
} else {
syncRaftBecomeLeader(pRaft);
syncRaftBroadcastAppend(pRaft);
}
} else if (result == SYNC_RAFT_VOTE_LOST) {
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
}
......
......@@ -24,12 +24,12 @@ static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress,
SyncIndex prevIndex, SyncTerm prevTerm,
SSyncRaftEntry *entries, int nEntry);
// syncRaftReplicate sends an append RPC with new entries to the given peer,
// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) {
bool syncRaftMaybeSendAppend(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) {
assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
SyncNodeId nodeId = progress->id;
......
......@@ -19,8 +19,6 @@
#include "raft_message.h"
#include "sync_raft_progress_tracker.h"
static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType);
void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
if (pRaft->state == TAOS_SYNC_STATE_LEADER) {
syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId);
......@@ -28,7 +26,7 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
}
if (!syncRaftIsPromotable(pRaft)) {
syncWarn("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId);
syncWarn("[%d:%d] is unpromotable and can not syncRaftCampaign", pRaft->selfGroupId, pRaft->selfId);
return;
}
......@@ -41,17 +39,17 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
campaign(pRaft, cType);
syncRaftCampaign(pRaft, cType);
}
// campaign transitions the raft instance to candidate state. This must only be
// syncRaftCampaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
void syncRaftCampaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
bool preVote;
SyncTerm term;
if (syncRaftIsPromotable(pRaft)) {
syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId);
syncDebug("[%d:%d] is unpromotable; syncRaftCampaign() should have been called", pRaft->selfGroupId, pRaft->selfId);
return;
}
......
......@@ -25,6 +25,8 @@ static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool increaseUncommittedSize(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n);
static int triggerAll(SSyncRaft* pRaft);
static void tickElection(SSyncRaft* pRaft);
......@@ -82,13 +84,22 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) {
resetRaft(pRaft, pRaft->term);
pRaft->leaderId = pRaft->leaderId;
pRaft->state = TAOS_SYNC_STATE_LEADER;
// TODO: check if there is pending config log
int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log);
if (nPendingConf > 1) {
syncFatal("unexpected multiple uncommitted config entry");
}
syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId);
assert(progress != NULL);
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
syncRaftProgressBecomeReplicate(progress);
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
pRaft->pendingConfigIndex = lastIndex;
// after become leader, send a no-op log
SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry));
......@@ -103,6 +114,7 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) {
};
appendEntries(pRaft, entry, 1);
//syncRaftTriggerHeartbeat(pRaft);
syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) {
......@@ -192,9 +204,11 @@ static void visitProgressSendAppend(SSyncRaftProgress* progress, void* arg) {
return;
}
syncRaftReplicate(arg, progress, true);
syncRaftMaybeSendAppend(arg, progress, true);
}
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
void syncRaftBroadcastAppend(SSyncRaft* pRaft) {
syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft);
}
......@@ -267,6 +281,11 @@ static void tickHeartbeat(SSyncRaft* pRaft) {
}
// TODO
static bool increaseUncommittedSize(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
return false;
}
static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm term = pRaft->term;
......@@ -277,9 +296,16 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
entries[i].index = lastIndex + 1 + i;
}
// Track the size of this uncommitted proposal.
if (!increaseUncommittedSize(pRaft, entries, n)) {
// Drop the proposal.
return;
}
syncRaftLogAppend(pRaft->log, entries, n);
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId);
assert(progress != NULL);
syncRaftProgressMaybeUpdate(progress, lastIndex);
// Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend.
syncRaftMaybeCommit(pRaft);
......@@ -306,7 +332,7 @@ static int triggerAll(SSyncRaft* pRaft) {
continue;
}
syncRaftReplicate(pRaft, pRaft->tracker->progressMap.progress[i], true);
syncRaftMaybeSendAppend(pRaft, pRaft->tracker->progressMap.progress[i], true);
}
#endif
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册