提交 aab981f6 编写于 作者: L lichuang

[TD-10645][raft]<feature>add raft election message handle

上级 349a6a47
...@@ -26,12 +26,13 @@ extern "C" { ...@@ -26,12 +26,13 @@ extern "C" {
typedef int32_t SyncNodeId; typedef int32_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
typedef int64_t SyncIndex; typedef int64_t SyncIndex;
typedef uint64_t SSyncTerm; typedef uint64_t SyncTerm;
typedef enum { typedef enum {
TAOS_SYNC_ROLE_FOLLOWER = 0, TAOS_SYNC_ROLE_FOLLOWER = 0,
TAOS_SYNC_ROLE_CANDIDATE = 1, TAOS_SYNC_ROLE_CANDIDATE = 1,
TAOS_SYNC_ROLE_LEADER = 2, TAOS_SYNC_ROLE_LEADER = 2,
TAOS_SYNC_ROLE_PRE_CANDIDATE = 3,
} ESyncRole; } ESyncRole;
typedef struct { typedef struct {
...@@ -111,7 +112,7 @@ typedef struct SSyncLogStore { ...@@ -111,7 +112,7 @@ typedef struct SSyncLogStore {
typedef struct SSyncServerState { typedef struct SSyncServerState {
SyncNodeId voteFor; SyncNodeId voteFor;
SSyncTerm term; SyncTerm term;
SyncIndex commitIndex; SyncIndex commitIndex;
} SSyncServerState; } SSyncServerState;
......
...@@ -43,10 +43,11 @@ struct SSyncRaft { ...@@ -43,10 +43,11 @@ struct SSyncRaft {
SSyncInfo info; SSyncInfo info;
SSyncTerm term; SyncTerm term;
SyncNodeId voteFor; SyncNodeId voteFor;
SyncNodeId selfId; SyncNodeId selfId;
SyncGroupId selfGroupId;
/** /**
* the leader id * the leader id
...@@ -100,14 +101,25 @@ struct SSyncRaft { ...@@ -100,14 +101,25 @@ struct SSyncRaft {
SyncRaftTickFp tickFp; SyncRaftTickFp tickFp;
}; };
typedef enum {
SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0,
SYNC_RAFT_CAMPAIGN_ELECTION = 1,
} SyncRaftCampaignType;
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo); int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo);
int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg); int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int32_t syncRaftTick(SSyncRaft* pRaft); int32_t syncRaftTick(SSyncRaft* pRaft);
void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderId); void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId);
void syncRaftBecomePreCandidate(SSyncRaft* pRaft);
void syncRaftBecomeCandidate(SSyncRaft* pRaft);
void syncRaftBecomeLeader(SSyncRaft* pRaft);
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft);
bool syncRaftIsPromotable(SSyncRaft* pRaft); bool syncRaftIsPromotable(SSyncRaft* pRaft);
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft);
int syncRaftQuorum(SSyncRaft* pRaft);
int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, RaftMessageType msgType, bool accept);
#endif /* _TD_LIBS_SYNC_RAFT_H */ #endif /* _TD_LIBS_SYNC_RAFT_H */
\ No newline at end of file
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_LIBS_SYNC_RAFT_MESSAGE_H #define _TD_LIBS_SYNC_RAFT_MESSAGE_H
#include "sync.h" #include "sync.h"
#include "sync_type.h"
/** /**
* below define message type which handled by Raft node thread * below define message type which handled by Raft node thread
...@@ -54,7 +55,7 @@ typedef struct RaftMsg_PreVoteResp { ...@@ -54,7 +55,7 @@ typedef struct RaftMsg_PreVoteResp {
typedef struct SSyncMessage { typedef struct SSyncMessage {
RaftMessageType msgType; RaftMessageType msgType;
SSyncTerm term; SyncTerm term;
SyncNodeId from; SyncNodeId from;
SyncNodeId to; SyncNodeId to;
...@@ -94,11 +95,19 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo ...@@ -94,11 +95,19 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo
return pMsg; return pMsg;
} }
static FORCE_INLINE bool syncIsInternalMsg(const SSyncMessage* pMsg) { static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) {
return pMsg->msgType == RAFT_MSG_INTERNAL_PROP || return msgType == RAFT_MSG_INTERNAL_PROP ||
pMsg->msgType == RAFT_MSG_INTERNAL_ELECTION; msgType == RAFT_MSG_INTERNAL_ELECTION;
}
static FORCE_INLINE RaftMessageType SyncRaftVoteRespMsgType(RaftMessageType msgType) {
if (msgType == RAFT_MSG_VOTE) return RAFT_MSG_PRE_VOTE_RESP;
return RAFT_MSG_PRE_VOTE_RESP;
} }
void syncFreeMessage(const SSyncMessage* pMsg); void syncFreeMessage(const SSyncMessage* pMsg);
// message handlers
void syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
#endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */ #endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */
\ No newline at end of file
...@@ -67,13 +67,13 @@ int raftLogNumEntries(const RaftLog* pLog); ...@@ -67,13 +67,13 @@ int raftLogNumEntries(const RaftLog* pLog);
/** /**
* return last term of in memory log, return 0 if log is empty * return last term of in memory log, return 0 if log is empty
**/ **/
SSyncTerm raftLogLastTerm(RaftLog* pLog); SyncTerm raftLogLastTerm(RaftLog* pLog);
/** /**
* return term of log with the given index, return 0 if the term of index cannot be found * return term of log with the given index, return 0 if the term of index cannot be found
* , errCode will save the error code. * , errCode will save the error code.
**/ **/
SSyncTerm raftLogTermOf(RaftLog* pLog, SyncIndex index, RaftCode* errCode); SyncTerm raftLogTermOf(RaftLog* pLog, SyncIndex index, RaftCode* errCode);
/** /**
* Get the last index of the most recent snapshot. Return 0 if there are no * * Get the last index of the most recent snapshot. Return 0 if there are no *
...@@ -83,7 +83,7 @@ SyncIndex raftLogSnapshotIndex(RaftLog* pLog); ...@@ -83,7 +83,7 @@ SyncIndex raftLogSnapshotIndex(RaftLog* pLog);
/* Append a new entry to the log. */ /* Append a new entry to the log. */
int raftLogAppend(RaftLog* pLog, int raftLogAppend(RaftLog* pLog,
SSyncTerm term, SyncTerm term,
const SSyncBuffer *buf); const SSyncBuffer *buf);
/** /**
......
...@@ -31,7 +31,7 @@ static void tickHeartbeat(SSyncRaft* pRaft); ...@@ -31,7 +31,7 @@ static void tickHeartbeat(SSyncRaft* pRaft);
static void abortLeaderTransfer(SSyncRaft* pRaft); static void abortLeaderTransfer(SSyncRaft* pRaft);
static void resetRaft(SSyncRaft* pRaft, SSyncTerm term); static void resetRaft(SSyncRaft* pRaft, SyncTerm term);
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
SSyncNode* pNode = pRaft->pNode; SSyncNode* pNode = pRaft->pNode;
...@@ -84,7 +84,9 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -84,7 +84,9 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
} }
int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
syncDebug("from "); syncDebug("from %d, to %d, type:%d, term:%" PRId64 ", state:%d",
pMsg->from, pMsg->to, pMsg->msgType, pMsg->term, pRaft->state);
if (preHandleMessage(pRaft, pMsg)) { if (preHandleMessage(pRaft, pMsg)) {
syncFreeMessage(pMsg); syncFreeMessage(pMsg);
return 0; return 0;
...@@ -92,7 +94,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { ...@@ -92,7 +94,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
RaftMessageType msgType = pMsg->msgType; RaftMessageType msgType = pMsg->msgType;
if (msgType == RAFT_MSG_INTERNAL_ELECTION) { if (msgType == RAFT_MSG_INTERNAL_ELECTION) {
syncRaftHandleElectionMessage(pRaft, pMsg);
} else if (msgType == RAFT_MSG_VOTE || msgType == RAFT_MSG_PRE_VOTE) { } else if (msgType == RAFT_MSG_VOTE || msgType == RAFT_MSG_PRE_VOTE) {
} else { } else {
...@@ -107,7 +109,7 @@ int32_t syncRaftTick(SSyncRaft* pRaft) { ...@@ -107,7 +109,7 @@ int32_t syncRaftTick(SSyncRaft* pRaft) {
return 0; return 0;
} }
void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderId) { void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId) {
pRaft->stepFp = stepFollower; pRaft->stepFp = stepFollower;
resetRaft(pRaft, term); resetRaft(pRaft, term);
pRaft->tickFp = tickElection; pRaft->tickFp = tickElection;
...@@ -115,6 +117,40 @@ void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderI ...@@ -115,6 +117,40 @@ void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderI
pRaft->state = TAOS_SYNC_ROLE_FOLLOWER; pRaft->state = TAOS_SYNC_ROLE_FOLLOWER;
} }
void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
/**
* Becoming a pre-candidate changes our step functions and state,
* but doesn't change anything else. In particular it does not increase
* r.Term or change r.Vote.
**/
pRaft->stepFp = stepCandidate;
pRaft->tickFp = tickElection;
pRaft->state = TAOS_SYNC_ROLE_PRE_CANDIDATE;
syncInfo("[%d:%d] became pre-candidate at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomeCandidate(SSyncRaft* pRaft) {
pRaft->stepFp = stepCandidate;
// become candidate make term+1
resetRaft(pRaft, pRaft->term + 1);
pRaft->tickFp = tickElection;
pRaft->voteFor = pRaft->selfId;
pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
syncInfo("[%d:%d] became candidate at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftBecomeLeader(SSyncRaft* pRaft) {
assert(pRaft->state != TAOS_SYNC_ROLE_FOLLOWER);
pRaft->stepFp = stepLeader;
resetRaft(pRaft, pRaft->term);
pRaft->leaderId = pRaft->leaderId;
pRaft->state = TAOS_SYNC_ROLE_LEADER;
// TODO: check if there is pending config log
syncInfo("[%d:%d] became leader at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
}
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) { void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) {
// electionTimeoutTick in [3,6] tick // electionTimeoutTick in [3,6] tick
pRaft->electionTimeoutTick = taosRand() % 4 + 3; pRaft->electionTimeoutTick = taosRand() % 4 + 3;
...@@ -130,6 +166,20 @@ bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { ...@@ -130,6 +166,20 @@ bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) {
return pRaft->electionElapsed >= pRaft->electionTimeoutTick; return pRaft->electionElapsed >= pRaft->electionTimeoutTick;
} }
int syncRaftQuorum(SSyncRaft* pRaft) {
return pRaft->leaderState.nProgress / 2 + 1;
}
int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, RaftMessageType msgType, bool accept) {
if (accept) {
} else {
}
}
/** /**
* pre-handle message, return true is no need to continue * pre-handle message, return true is no need to continue
* Handle the message term, which may result in our stepping down to a follower. * Handle the message term, which may result in our stepping down to a follower.
...@@ -166,6 +216,8 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) ...@@ -166,6 +216,8 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
* term. * term.
**/ **/
} else { } else {
syncInfo("%d [term:%" PRId64 "] received a %d message with higher term from %d [term:%" PRId64 "]",
pRaft->selfId, pRaft->term, msgType, pMsg->from, pMsg->term);
syncRaftBecomeFollower(pRaft, pMsg->term, leaderId); syncRaftBecomeFollower(pRaft, pMsg->term, leaderId);
} }
...@@ -218,7 +270,7 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) { ...@@ -218,7 +270,7 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) {
pRaft->leadTransferee = SYNC_NON_NODE_ID; pRaft->leadTransferee = SYNC_NON_NODE_ID;
} }
static void resetRaft(SSyncRaft* pRaft, SSyncTerm term) { static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
if (pRaft->term != term) { if (pRaft->term != term) {
pRaft->term = term; pRaft->term = term;
pRaft->voteFor = SYNC_NON_NODE_ID; pRaft->voteFor = SYNC_NON_NODE_ID;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "raft.h"
#include "raft_message.h"
static void campaign(SSyncRaft* pRaft, SyncRaftCampaignType cType);
void syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (pRaft->state == TAOS_SYNC_ROLE_LEADER) {
syncDebug("%d ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfId);
return;
}
// TODO: is there pending uncommitted config?
syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
if (pRaft->preVote) {
} else {
}
}
static void campaign(SSyncRaft* pRaft, SyncRaftCampaignType cType) {
SyncTerm term;
RaftMessageType voteMsgType;
if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) {
syncRaftBecomePreCandidate(pRaft);
voteMsgType = RAFT_MSG_PRE_VOTE;
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = pRaft->term + 1;
} else {
syncRaftBecomeCandidate(pRaft);
voteMsgType = RAFT_MSG_VOTE;
term = pRaft->term;
}
int quorum = syncRaftQuorum(pRaft);
int granted = syncRaftNumOfGranted(pRaft, pRaft->selfId, SyncRaftVoteRespMsgType(voteMsgType), true);
if (quorum <= granted) {
/**
* We won the election after voting for ourselves (which must mean that
* this is a single-node cluster). Advance to the next state.
**/
if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) {
campaign(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION);
} else {
syncRaftBecomeLeader(pRaft);
}
return;
}
// broadcast vote message to other peers
}
\ No newline at end of file
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "raft_message.h" #include "raft_message.h"
void syncFreeMessage(const SSyncMessage* pMsg) { void syncFreeMessage(const SSyncMessage* pMsg) {
if (!syncIsInternalMsg(pMsg)) { if (!syncIsInternalMsg(pMsg->msgType)) {
free((SSyncMessage*)pMsg); free((SSyncMessage*)pMsg);
} }
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册