提交 da106e29 编写于 作者: L lichuang

[TD-10645][raft]<feature>add raft vote message handle

上级 c25d174f
......@@ -38,6 +38,7 @@ typedef struct RaftCandidateState {
} RaftCandidateState;
typedef struct SSyncRaftIOMethods {
// send SSyncMessage to node
int (*send)(const SSyncMessage* pMsg, const SNodeInfo* pNode);
} SSyncRaftIOMethods;
......@@ -104,6 +105,7 @@ struct SSyncRaft {
SSyncRaftIOMethods io;
// union different state data
union {
RaftLeaderState leaderState;
RaftCandidateState candidateState;
......
......@@ -35,6 +35,8 @@ SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog);
SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog);
bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term);
int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog);
bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog);
......
......@@ -20,10 +20,13 @@
#include "sync_type.h"
/**
* below define message type which handled by Raft node thread
* internal message, which communicate in threads, start with RAFT_MSG_INTERNAL_*,
* internal message use pointer only, need not to be decode/encode
* outter message start with RAFT_MSG_*, need to implement its decode/encode functions
* below define message type which handled by Raft.
*
* internal message, which communicate between threads, start with RAFT_MSG_INTERNAL_*.
* internal message use pointer only and stack memory, need not to be decode/encode and free.
*
* outter message start with RAFT_MSG_*, which communicate between cluster peers,
* need to implement its decode/encode functions.
**/
typedef enum RaftMessageType {
// client propose a cmd
......@@ -36,6 +39,7 @@ typedef enum RaftMessageType {
RAFT_MSG_VOTE_RESP = 4,
RAFT_MSG_APPEND = 5,
RAFT_MSG_APPEND_RESP = 6,
} RaftMessageType;
typedef struct RaftMsgInternal_Prop {
......@@ -55,7 +59,7 @@ typedef struct RaftMsg_Vote {
} RaftMsg_Vote;
typedef struct RaftMsg_VoteResp {
bool reject;
bool rejected;
SyncRaftElectionType cType;
} RaftMsg_VoteResp;
......@@ -115,6 +119,7 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId
.from = from,
.to = to,
.term = term,
.msgType = RAFT_MSG_VOTE,
.vote = (RaftMsg_Vote) {
.cType = cType,
.lastIndex = lastIndex,
......@@ -125,6 +130,26 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId
return pMsg;
}
static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from, SyncNodeId to,
SyncRaftElectionType cType, bool rejected) {
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
if (pMsg == NULL) {
return NULL;
}
*pMsg = (SSyncMessage) {
.groupId = groupId,
.from = from,
.to = to,
.msgType = RAFT_MSG_VOTE_RESP,
.voteResp = (RaftMsg_VoteResp) {
.cType = cType,
.rejected = rejected,
},
};
return pMsg;
}
static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) {
return msgType == RAFT_MSG_INTERNAL_PROP ||
msgType == RAFT_MSG_INTERNAL_ELECTION;
......@@ -142,6 +167,7 @@ void syncFreeMessage(const SSyncMessage* pMsg);
// message handlers
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
#endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */
\ No newline at end of file
......@@ -107,7 +107,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (msgType == RAFT_MSG_INTERNAL_ELECTION) {
syncRaftHandleElectionMessage(pRaft, pMsg);
} else if (msgType == RAFT_MSG_VOTE) {
syncRaftHandleVoteMessage(pRaft, pMsg);
} else {
pRaft->stepFp(pRaft, pMsg);
}
......@@ -245,7 +245,7 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
if (syncIsPreVoteMsg(pMsg)) {
// Never change our term in response to a PreVote
} else if (syncIsPreVoteRespMsg(pMsg) && !pMsg->voteResp.reject) {
} else if (syncIsPreVoteRespMsg(pMsg) && !pMsg->voteResp.rejected) {
/**
* We send pre-vote requests with a term in our future. If the
* pre-vote is granted, we will increment our term when we get a
......
......@@ -15,6 +15,7 @@
#include "syncInt.h"
#include "raft.h"
#include "raft_log.h"
#include "raft_message.h"
void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) {
......@@ -66,7 +67,7 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) {
continue;
}
syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %d] sent %d request to %d at term %" PRId64 "",
syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent %d request to %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, lastTerm,
lastIndex, voteMsgType, nodeId, pRaft->term);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "raft.h"
#include "raft_log.h"
#include "raft_message.h"
static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
SSyncMessage* pRespMsg;
int voteIndex = syncRaftConfigurationIndexOfVoter(pRaft, pMsg->from);
if (voteIndex == -1) {
return 0;
}
bool grant;
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log);
grant = canGrantVoteMessage(pRaft, pMsg);
pRespMsg = syncNewVoteRespMsg(pRaft->selfGroupId, pRaft->selfId, pMsg->to, pMsg->vote.cType, !grant);
if (pRespMsg == NULL) {
return 0;
}
syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 ", vote: %d] %s for %d" \
"[logterm: %" PRId64 ", index: %" PRId64 ", vote: %d] at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, lastTerm, lastIndex, pRaft->voteFor,
grant ? "grant" : "reject",
pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term);
pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[voteIndex]));
return 0;
}
static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (!(pRaft->voteFor == SYNC_NON_NODE_ID || pMsg->term > pRaft->term || pRaft->voteFor == pMsg->from)) {
return false;
}
if (!syncRaftLogIsUptodate(pRaft, pMsg->vote.lastIndex, pMsg->vote.lastTerm)) {
return false;
}
return true;
}
\ No newline at end of file
......@@ -35,7 +35,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
granted = syncRaftNumOfGranted(pRaft, pMsg->from,
pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION,
!pMsg->voteResp.reject, &rejected);
!pMsg->voteResp.rejected, &rejected);
quorum = syncRaftQuorum(pRaft);
syncInfo("[%d:%d] [quorum:%d] has received %d votes and %d vote rejections",
......
......@@ -27,6 +27,10 @@ SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) {
return 0;
}
bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term) {
return true;
}
int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog) {
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册