提交 de3164f1 编写于 作者: L lichuang

[TD-10645][raft]<feature>add raft progress tracker

上级 e17f573e
......@@ -19,7 +19,8 @@
#include "sync.h"
#include "sync_type.h"
#include "raft_message.h"
#include "sync_raft_impl.h"
#include "sync_raft_quorum.h"
typedef struct RaftLeaderState {
......@@ -140,20 +141,4 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo);
int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int32_t syncRaftTick(SSyncRaft* pRaft);
void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId);
void syncRaftBecomePreCandidate(SSyncRaft* pRaft);
void syncRaftBecomeCandidate(SSyncRaft* pRaft);
void syncRaftBecomeLeader(SSyncRaft* pRaft);
void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType);
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft);
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft);
bool syncRaftIsPromotable(SSyncRaft* pRaft);
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft);
int syncRaftQuorum(SSyncRaft* pRaft);
int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id,
bool preVote, bool accept, int* rejectNum);
#endif /* _TD_LIBS_SYNC_RAFT_H */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_IMPL_H
#define _TD_LIBS_SYNC_RAFT_IMPL_H
#include "sync.h"
#include "sync_type.h"
#include "raft_message.h"
#include "sync_raft_quorum.h"
void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId);
void syncRaftBecomePreCandidate(SSyncRaft* pRaft);
void syncRaftBecomeCandidate(SSyncRaft* pRaft);
void syncRaftBecomeLeader(SSyncRaft* pRaft);
void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType);
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft);
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft);
bool syncRaftIsPromotable(SSyncRaft* pRaft);
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft);
int syncRaftQuorum(SSyncRaft* pRaft);
SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
bool preVote, bool accept,
int* rejectNum, int *granted);
#endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */
......@@ -63,6 +63,8 @@ typedef enum RaftProgressState {
* progresses of all followers, and sends entries to the follower based on its progress.
**/
struct SSyncRaftProgress {
SyncNodeId id;
SyncIndex nextIndex;
SyncIndex matchIndex;
......
......@@ -23,7 +23,8 @@
struct SSyncRaftProgressTrackerConfig {
SSyncRaftQuorumJointConfig voters;
/** AutoLeave is true if the configuration is joint and a transition to the
/**
* autoLeave is true if the configuration is joint and a transition to the
* incoming configuration should be carried out automatically by Raft when
* this is possible. If false, the configuration will be joint until the
* application initiates the transition manually.
......@@ -86,7 +87,7 @@ struct SSyncRaftProgressTracker {
SSyncRaftProgress progressMap[TSDB_MAX_REPLICA];
SyncRaftVoteRespType votes[TSDB_MAX_REPLICA];
SyncRaftVoteResult votes[TSDB_MAX_REPLICA];
int maxInflight;
};
......@@ -97,4 +98,16 @@ void syncRaftResetVotes(SSyncRaftProgressTracker*);
typedef void (*visitProgressFp)(int i, SSyncRaftProgress* progress, void* arg);
void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg);
/**
* syncRaftRecordVote records that the node with the given id voted for this Raft
* instance if v == true (and declined it otherwise).
**/
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant);
/**
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
* election outcome is known.
**/
SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted);
#endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_QUORUM_H
#define TD_SYNC_RAFT_QUORUM_H
/**
* SSyncRaftVoteResult indicates the outcome of a vote.
**/
typedef enum {
/**
* SYNC_RAFT_VOTE_PENDING indicates that the decision of the vote depends on future
* votes, i.e. neither "yes" or "no" has reached quorum yet.
**/
SYNC_RAFT_VOTE_PENDING = 1,
/**
* SYNC_RAFT_VOTE_LOST indicates that the quorum has voted "no".
**/
SYNC_RAFT_VOTE_LOST = 2,
/**
* SYNC_RAFT_VOTE_WON indicates that the quorum has voted "yes".
**/
SYNC_RAFT_VOTE_WON = 3,
} SSyncRaftVoteResult;
#endif /* TD_SYNC_RAFT_QUORUM_H */
\ No newline at end of file
......@@ -18,13 +18,21 @@
#include "taosdef.h"
#include "sync.h"
#include "sync_type.h"
/**
* JointConfig is a configuration of two groups of (possibly overlapping)
* SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping)
* majority configurations. Decisions require the support of both majorities.
**/
typedef struct SSyncRaftQuorumJointConfig {
SyncNodeId majorityConfig[2][TSDB_MAX_REPLICA];
SSyncCluster majorityConfig[2];
}SSyncRaftQuorumJointConfig;
/**
* syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* a result indicating whether the vote is pending, lost, or won. A joint quorum
* requires both majority quorums to vote in favor.
**/
SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes);
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H
#define _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H
#include "sync.h"
#include "sync_type.h"
/**
* syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* a result indicating whether the vote is pending (i.e. neither a quorum of
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* quorum of no has been reached).
**/
SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes);
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */
......@@ -61,6 +61,6 @@ typedef enum {
//reject the vote request
SYNC_RAFT_VOTE_RESP_REJECT = 2,
} SyncRaftVoteRespType;
} SyncRaftVoteResult;
#endif /* _TD_LIBS_SYNC_TYPE_H */
......@@ -26,23 +26,6 @@ static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int convertClear(SSyncRaft* pRaft);
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int triggerAll(SSyncRaft* pRaft);
static void tickElection(SSyncRaft* pRaft);
static void tickHeartbeat(SSyncRaft* pRaft);
static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n);
static bool maybeCommit(SSyncRaft* pRaft);
static void abortLeaderTransfer(SSyncRaft* pRaft);
static void resetRaft(SSyncRaft* pRaft, SyncTerm term);
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
SSyncNode* pNode = pRaft->pNode;
SSyncServerState serverState;
......@@ -136,124 +119,6 @@ int32_t syncRaftTick(SSyncRaft* pRaft) {
return 0;
}
void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId) {
convertClear(pRaft);
pRaft->stepFp = stepFollower;
resetRaft(pRaft, term);
pRaft->tickFp = tickElection;
pRaft->leaderId = leaderId;
pRaft->state = TAOS_SYNC_ROLE_FOLLOWER;
syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
convertClear(pRaft);
/**
* Becoming a pre-candidate changes our step functions and state,
* but doesn't change anything else. In particular it does not increase
* r.Term or change r.Vote.
**/
pRaft->stepFp = stepCandidate;
pRaft->tickFp = tickElection;
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
pRaft->candidateState.inPreVote = true;
syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomeCandidate(SSyncRaft* pRaft) {
convertClear(pRaft);
pRaft->candidateState.inPreVote = false;
pRaft->stepFp = stepCandidate;
// become candidate make term+1
resetRaft(pRaft, pRaft->term + 1);
pRaft->tickFp = tickElection;
pRaft->voteFor = pRaft->selfId;
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomeLeader(SSyncRaft* pRaft) {
assert(pRaft->state != TAOS_SYNC_ROLE_FOLLOWER);
pRaft->stepFp = stepLeader;
resetRaft(pRaft, pRaft->term);
pRaft->leaderId = pRaft->leaderId;
pRaft->state = TAOS_SYNC_ROLE_LEADER;
// TODO: check if there is pending config log
int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log);
if (nPendingConf > 1) {
syncFatal("unexpected multiple uncommitted config entry");
}
syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
// after become leader, send a no-op log
SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry));
if (entry == NULL) {
return;
}
*entry = (SSyncRaftEntry) {
.buffer = (SSyncBuffer) {
.data = NULL,
.len = 0,
}
};
appendEntries(pRaft, entry, 1);
//syncRaftTriggerHeartbeat(pRaft);
}
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) {
triggerAll(pRaft);
}
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) {
// electionTimeoutTick in [3,6] tick
pRaft->randomizedElectionTimeout = taosRand() % 4 + 3;
}
bool syncRaftIsPromotable(SSyncRaft* pRaft) {
return pRaft->selfId != SYNC_NON_NODE_ID;
}
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) {
return pRaft->electionElapsed >= pRaft->randomizedElectionTimeout;
}
int syncRaftQuorum(SSyncRaft* pRaft) {
return pRaft->cluster.replica / 2 + 1;
}
int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept, int* rejectNum) {
/*
if (accept) {
syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
} else {
syncInfo("[%d:%d] received rejection from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, id, pRaft->term);
}
int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id);
assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0);
assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN);
pRaft->candidateState.votes[voteIndex] = accept ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT;
int granted = 0, rejected = 0;
int i;
for (i = 0; i < pRaft->cluster.replica; ++i) {
if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) granted++;
else if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_REJECT) rejected++;
}
if (rejectNum) *rejectNum = rejected;
return granted;
*/
return 0;
}
/**
* pre-handle message, return true means no need to continue
* Handle the message term, which may result in our stepping down to a follower.
......@@ -335,138 +200,4 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
}
return true;
}
static int convertClear(SSyncRaft* pRaft) {
}
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return 0;
}
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
/**
* Only handle vote responses corresponding to our candidacy (while in
* StateCandidate, we may get stale MsgPreVoteResp messages in this term from
* our pre-candidate state).
**/
RaftMessageType msgType = pMsg->msgType;
if (msgType == RAFT_MSG_INTERNAL_PROP) {
return 0;
}
if (msgType == RAFT_MSG_VOTE_RESP) {
syncRaftHandleVoteRespMessage(pRaft, pMsg);
return 0;
} else if (msgType == RAFT_MSG_APPEND) {
syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from);
syncRaftHandleAppendEntriesMessage(pRaft, pMsg);
}
return 0;
}
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
convertClear(pRaft);
return 0;
}
/**
* tickElection is run by followers and candidates per tick.
**/
static void tickElection(SSyncRaft* pRaft) {
pRaft->electionElapsed += 1;
if (!syncRaftIsPromotable(pRaft)) {
return;
}
if (!syncRaftIsPastElectionTimeout(pRaft)) {
return;
}
// election timeout
pRaft->electionElapsed = 0;
SSyncMessage msg;
syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId));
}
static void tickHeartbeat(SSyncRaft* pRaft) {
}
static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm term = pRaft->term;
int i;
for (i = 0; i < n; ++i) {
entries[i].term = term;
entries[i].index = lastIndex + 1 + i;
}
syncRaftLogAppend(pRaft->log, entries, n);
SSyncRaftProgress* progress = &(pRaft->tracker->progressMap[pRaft->cluster.selfIndex]);
syncRaftProgressMaybeUpdate(progress, lastIndex);
// Regardless of maybeCommit's return, our caller will call bcastAppend.
maybeCommit(pRaft);
}
/**
* maybeCommit attempts to advance the commit index. Returns true if
* the commit index changed (in which case the caller should call
* r.bcastAppend).
**/
static bool maybeCommit(SSyncRaft* pRaft) {
return true;
}
/**
* trigger I/O requests for newly appended log entries or heartbeats.
**/
static int triggerAll(SSyncRaft* pRaft) {
assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
int i;
for (i = 0; i < pRaft->cluster.replica; ++i) {
if (i == pRaft->cluster.selfIndex) {
continue;
}
syncRaftReplicate(pRaft, i);
}
}
static void abortLeaderTransfer(SSyncRaft* pRaft) {
pRaft->leadTransferee = SYNC_NON_NODE_ID;
}
static void initProgress(int i, SSyncRaftProgress* progress, void* arg) {
syncRaftInitProgress(i, (SSyncRaft*)arg, progress);
}
static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
if (pRaft->term != term) {
pRaft->term = term;
pRaft->voteFor = SYNC_NON_NODE_ID;
}
pRaft->leaderId = SYNC_NON_NODE_ID;
pRaft->electionElapsed = 0;
pRaft->heartbeatElapsed = 0;
syncRaftRandomizedElectionTimeout(pRaft);
abortLeaderTransfer(pRaft);
syncRaftResetVotes(pRaft->tracker);
syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft);
pRaft->pendingConfigIndex = 0;
pRaft->uncommittedSize = 0;
}
}
\ No newline at end of file
......@@ -23,6 +23,11 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) {
bool preVote;
RaftMessageType voteMsgType;
if (syncRaftIsPromotable(pRaft)) {
syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId);
return 0;
}
if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) {
syncRaftBecomePreCandidate(pRaft);
preVote = true;
......@@ -36,8 +41,8 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) {
}
int quorum = syncRaftQuorum(pRaft);
int granted = syncRaftNumOfGranted(pRaft, pRaft->selfId, preVote, true, NULL);
if (quorum <= granted) {
SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL);
if (result == SYNC_RAFT_VOTE_WON) {
/**
* We won the election after voting for ourselves (which must mean that
* this is a single-node cluster). Advance to the next state.
......
......@@ -20,10 +20,14 @@
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (pRaft->state == TAOS_SYNC_ROLE_LEADER) {
syncDebug("%d ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfId);
syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId);
return 0;
}
if (!syncRaftIsPromotable(pRaft)) {
syncDebug("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId);
return 0;
}
// if there is pending uncommitted config,cannot start election
if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) {
syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply",
......
......@@ -23,6 +23,8 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
int quorum;
int voterIndex;
assert(pRaft->state == TAOS_SYNC_ROLE_CANDIDATE);
voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (voterIndex == -1) {
syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from);
......@@ -34,24 +36,23 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return 0;
}
granted = syncRaftNumOfGranted(pRaft, pMsg->from,
SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pMsg->from,
pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION,
!pMsg->voteResp.rejected, &rejected);
quorum = syncRaftQuorum(pRaft);
!pMsg->voteResp.rejected, &rejected, &granted);
syncInfo("[%d:%d] [quorum:%d] has received %d votes and %d vote rejections",
pRaft->selfGroupId, pRaft->selfId, quorum, granted, rejected);
if (granted >= quorum) {
if (pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) {
if (result == SYNC_RAFT_VOTE_WON) {
if (pRaft->candidateState.inPreVote) {
syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION);
} else {
syncRaftBecomeLeader(pRaft);
}
syncRaftBecomeLeader(pRaft);
return 0;
} else if (rejected == quorum) {
}
} else if (result == SYNC_RAFT_VOTE_LOST) {
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
}
return 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft.h"
#include "raft_configuration.h"
#include "raft_log.h"
#include "raft_replication.h"
#include "sync_raft_progress_tracker.h"
#include "syncInt.h"
static int convertClear(SSyncRaft* pRaft);
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int triggerAll(SSyncRaft* pRaft);
static void tickElection(SSyncRaft* pRaft);
static void tickHeartbeat(SSyncRaft* pRaft);
static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n);
static bool maybeCommit(SSyncRaft* pRaft);
static void abortLeaderTransfer(SSyncRaft* pRaft);
static void resetRaft(SSyncRaft* pRaft, SyncTerm term);
void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId) {
convertClear(pRaft);
pRaft->stepFp = stepFollower;
resetRaft(pRaft, term);
pRaft->tickFp = tickElection;
pRaft->leaderId = leaderId;
pRaft->state = TAOS_SYNC_ROLE_FOLLOWER;
syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
convertClear(pRaft);
/**
* Becoming a pre-candidate changes our step functions and state,
* but doesn't change anything else. In particular it does not increase
* r.Term or change r.Vote.
**/
pRaft->stepFp = stepCandidate;
pRaft->tickFp = tickElection;
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
pRaft->candidateState.inPreVote = true;
syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomeCandidate(SSyncRaft* pRaft) {
convertClear(pRaft);
pRaft->candidateState.inPreVote = false;
pRaft->stepFp = stepCandidate;
// become candidate make term+1
resetRaft(pRaft, pRaft->term + 1);
pRaft->tickFp = tickElection;
pRaft->voteFor = pRaft->selfId;
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomeLeader(SSyncRaft* pRaft) {
assert(pRaft->state != TAOS_SYNC_ROLE_FOLLOWER);
pRaft->stepFp = stepLeader;
resetRaft(pRaft, pRaft->term);
pRaft->leaderId = pRaft->leaderId;
pRaft->state = TAOS_SYNC_ROLE_LEADER;
// TODO: check if there is pending config log
int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log);
if (nPendingConf > 1) {
syncFatal("unexpected multiple uncommitted config entry");
}
syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
// after become leader, send a no-op log
SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry));
if (entry == NULL) {
return;
}
*entry = (SSyncRaftEntry) {
.buffer = (SSyncBuffer) {
.data = NULL,
.len = 0,
}
};
appendEntries(pRaft, entry, 1);
//syncRaftTriggerHeartbeat(pRaft);
}
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) {
triggerAll(pRaft);
}
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) {
// electionTimeoutTick in [3,6] tick
pRaft->randomizedElectionTimeout = taosRand() % 4 + 3;
}
bool syncRaftIsPromotable(SSyncRaft* pRaft) {
return pRaft->selfId != SYNC_NON_NODE_ID;
}
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) {
return pRaft->electionElapsed >= pRaft->randomizedElectionTimeout;
}
int syncRaftQuorum(SSyncRaft* pRaft) {
return pRaft->cluster.replica / 2 + 1;
}
SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
bool preVote, bool grant,
int* rejected, int *granted) {
int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id);
if (voterIndex == -1) {
return SYNC_RAFT_VOTE_PENDING;
}
if (grant) {
syncInfo("[%d:%d] received grant (pre-vote %d) from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
} else {
syncInfo("[%d:%d] received rejection (pre-vote %d) from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
}
syncRaftRecordVote(pRaft->tracker, voterIndex, grant);
return syncRaftTallyVotes(pRaft->tracker, rejected, granted);
}
/*
if (accept) {
syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
} else {
syncInfo("[%d:%d] received rejection from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, id, pRaft->term);
}
int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id);
assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0);
assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN);
pRaft->candidateState.votes[voteIndex] = accept ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT;
int granted = 0, rejected = 0;
int i;
for (i = 0; i < pRaft->cluster.replica; ++i) {
if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) granted++;
else if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_REJECT) rejected++;
}
if (rejectNum) *rejectNum = rejected;
return granted;
*/
static int convertClear(SSyncRaft* pRaft) {
}
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return 0;
}
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
/**
* Only handle vote responses corresponding to our candidacy (while in
* StateCandidate, we may get stale MsgPreVoteResp messages in this term from
* our pre-candidate state).
**/
RaftMessageType msgType = pMsg->msgType;
if (msgType == RAFT_MSG_INTERNAL_PROP) {
return 0;
}
if (msgType == RAFT_MSG_VOTE_RESP) {
syncRaftHandleVoteRespMessage(pRaft, pMsg);
return 0;
} else if (msgType == RAFT_MSG_APPEND) {
syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from);
syncRaftHandleAppendEntriesMessage(pRaft, pMsg);
}
return 0;
}
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
convertClear(pRaft);
return 0;
}
/**
* tickElection is run by followers and candidates per tick.
**/
static void tickElection(SSyncRaft* pRaft) {
pRaft->electionElapsed += 1;
if (!syncRaftIsPromotable(pRaft)) {
return;
}
if (!syncRaftIsPastElectionTimeout(pRaft)) {
return;
}
// election timeout
pRaft->electionElapsed = 0;
SSyncMessage msg;
syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId));
}
static void tickHeartbeat(SSyncRaft* pRaft) {
}
static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm term = pRaft->term;
int i;
for (i = 0; i < n; ++i) {
entries[i].term = term;
entries[i].index = lastIndex + 1 + i;
}
syncRaftLogAppend(pRaft->log, entries, n);
SSyncRaftProgress* progress = &(pRaft->tracker->progressMap[pRaft->cluster.selfIndex]);
syncRaftProgressMaybeUpdate(progress, lastIndex);
// Regardless of maybeCommit's return, our caller will call bcastAppend.
maybeCommit(pRaft);
}
/**
* maybeCommit attempts to advance the commit index. Returns true if
* the commit index changed (in which case the caller should call
* r.bcastAppend).
**/
static bool maybeCommit(SSyncRaft* pRaft) {
return true;
}
/**
* trigger I/O requests for newly appended log entries or heartbeats.
**/
static int triggerAll(SSyncRaft* pRaft) {
assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
int i;
for (i = 0; i < pRaft->cluster.replica; ++i) {
if (i == pRaft->cluster.selfIndex) {
continue;
}
syncRaftReplicate(pRaft, i);
}
}
static void abortLeaderTransfer(SSyncRaft* pRaft) {
pRaft->leadTransferee = SYNC_NON_NODE_ID;
}
static void initProgress(int i, SSyncRaftProgress* progress, void* arg) {
syncRaftInitProgress(i, (SSyncRaft*)arg, progress);
}
static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
if (pRaft->term != term) {
pRaft->term = term;
pRaft->voteFor = SYNC_NON_NODE_ID;
}
pRaft->leaderId = SYNC_NON_NODE_ID;
pRaft->electionElapsed = 0;
pRaft->heartbeatElapsed = 0;
syncRaftRandomizedElectionTimeout(pRaft);
abortLeaderTransfer(pRaft);
syncRaftResetVotes(pRaft->tracker);
syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft);
pRaft->pendingConfigIndex = 0;
pRaft->uncommittedSize = 0;
}
\ No newline at end of file
......@@ -34,6 +34,8 @@ void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress)
.matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0,
.nextIndex = syncRaftLogLastIndex(pRaft->log) + 1,
.inflights = inflights,
.isLearner = false,
.state = PROGRESS_STATE_PROBE,
};
}
......
......@@ -25,7 +25,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() {
}
void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) {
memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteRespType) * TSDB_MAX_REPLICA);
memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteResult) * TSDB_MAX_REPLICA);
}
void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) {
......@@ -34,4 +34,43 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp vi
SSyncRaftProgress* progress = &(tracker->progressMap[i]);
visit(i, progress, arg);
}
}
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) {
if (tracker->votes[i] != SYNC_RAFT_VOTE_RESP_UNKNOWN) {
return;
}
tracker->votes[i] = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT;
}
/**
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
* election outcome is known.
**/
SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) {
int i;
SSyncRaftProgress* progress;
int r, g;
for (i = 0, r = 0, g = 0; i < TSDB_MAX_REPLICA; ++i) {
progress = &(tracker->progressMap[i]);
if (progress->id == SYNC_NON_NODE_ID) {
continue;
}
if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) {
continue;
}
if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) {
g++;
} else {
r++;
}
}
if (rejected) *rejected = r;
if (granted) *granted = g;
return syncRaftVoteResult(&(tracker->config.voters), tracker->votes);
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync_raft_quorum_majority.h"
#include "sync_raft_quorum_joint.h"
#include "sync_raft_quorum.h"
/**
* syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* a result indicating whether the vote is pending, lost, or won. A joint quorum
* requires both majority quorums to vote in favor.
**/
SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes) {
SyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->majorityConfig[0]), votes);
SyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->majorityConfig[1]), votes);
if (r1 == r2) {
// If they agree, return the agreed state.
return r1;
}
if (r1 == SYNC_RAFT_VOTE_LOST || r2 == SYNC_RAFT_VOTE_LOST) {
// If either config has lost, loss is the only possible outcome.
return SYNC_RAFT_VOTE_LOST;
}
// One side won, the other one is pending, so the whole outcome is.
return SYNC_RAFT_VOTE_PENDING;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync_raft_quorum.h"
#include "sync_raft_quorum_majority.h"
/**
* syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* a result indicating whether the vote is pending (i.e. neither a quorum of
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* quorum of no has been reached).
**/
SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes) {
if (config->replica == 0) {
return SYNC_RAFT_VOTE_WON;
}
int i, g, r, missing;
for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) {
if (config->nodeInfo[i].nodeId == SYNC_NON_NODE_ID) {
continue;
}
if (votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) {
missing += 1;
} else if (votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) {
g +=1;
} else {
r += 1;
}
}
int quorum = config->replica / 2 + 1;
if (g >= quorum) {
return SYNC_RAFT_VOTE_WON;
}
if (r + missing >= quorum) {
return SYNC_RAFT_VOTE_PENDING;
}
return SYNC_RAFT_VOTE_LOST;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册