提交 4022f360 编写于 作者: L lichuang

[TD-10645][raft]<feature>add raft append message handle

上级 aee5ebd1
......@@ -20,7 +20,7 @@
#include "sync_type.h"
// return -1 if cannot find this id
int syncRaftConfigurationIndexOfVoter(SSyncRaft *pRaft, SyncNodeId id);
int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id);
int syncRaftConfigurationVoterCount(SSyncRaft *pRaft);
......
......@@ -37,6 +37,8 @@ SSyncRaftLog* syncRaftLogOpen();
SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog);
SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog);
SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog);
bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term);
......@@ -50,4 +52,13 @@ SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index);
int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize,
SSyncRaftEntry **ppEntries, int *n);
void syncRaftLogRelease(SSyncRaftLog* pLog, SyncIndex index,
SSyncRaftEntry *pEntries, int n);
bool syncRaftLogMatchTerm();
static FORCE_INLINE bool syncRaftLogIsCommitted(SSyncRaftLog* pLog, SyncIndex index) {
return pLog->commitIndex > index;
}
#endif /* _TD_LIBS_SYNC_RAFT_LOG_H */
......@@ -65,10 +65,10 @@ typedef struct RaftMsg_VoteResp {
typedef struct RaftMsg_Append_Entries {
// index of log entry preceeding new ones
SyncIndex prevIndex;
SyncIndex index;
// term of entry at prevIndex
SyncTerm prevTerm;
SyncTerm term;
// leader's commit index.
SyncIndex commitIndex;
......@@ -80,6 +80,10 @@ typedef struct RaftMsg_Append_Entries {
SSyncRaftEntry* entries;
} RaftMsg_Append_Entries;
typedef struct RaftMsg_Append_Resp {
SyncIndex index;
} RaftMsg_Append_Resp;
typedef struct SSyncMessage {
RaftMessageType msgType;
SyncTerm term;
......@@ -95,6 +99,7 @@ typedef struct SSyncMessage {
RaftMsg_VoteResp voteResp;
RaftMsg_Append_Entries appendEntries;
RaftMsg_Append_Resp appendResp;
};
} SSyncMessage;
......@@ -167,7 +172,7 @@ static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNo
}
static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNodeId from,
SyncTerm term, SyncIndex prevIndex, SyncTerm prevTerm,
SyncTerm term, SyncIndex logIndex, SyncTerm logTerm,
SyncIndex commitIndex, int nEntries, SSyncRaftEntry* entries) {
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
if (pMsg == NULL) {
......@@ -179,8 +184,8 @@ static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNode
.term = term,
.msgType = RAFT_MSG_APPEND,
.appendEntries = (RaftMsg_Append_Entries) {
.prevIndex = prevIndex,
.prevTerm = prevTerm,
.index = logIndex,
.term = logTerm,
.commitIndex = commitIndex,
.nEntries = nEntries,
.entries = entries,
......@@ -190,6 +195,24 @@ static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNode
return pMsg;
}
static FORCE_INLINE SSyncMessage* syncNewEmptyAppendRespMsg(SyncGroupId groupId, SyncNodeId from, SyncTerm term) {
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
if (pMsg == NULL) {
return NULL;
}
*pMsg = (SSyncMessage) {
.groupId = groupId,
.from = from,
.term = term,
.msgType = RAFT_MSG_APPEND_RESP,
.appendResp = (RaftMsg_Append_Resp) {
},
};
return pMsg;
}
static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) {
return msgType == RAFT_MSG_INTERNAL_PROP ||
msgType == RAFT_MSG_INTERNAL_ELECTION;
......@@ -209,5 +232,6 @@ void syncFreeMessage(const SSyncMessage* pMsg);
int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
#endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */
\ No newline at end of file
......@@ -16,6 +16,7 @@
#include "raft.h"
#include "raft_configuration.h"
#include "raft_log.h"
#include "raft_replication.h"
#include "syncInt.h"
#define RAFT_READ_LOG_MAX_NUM 100
......@@ -215,7 +216,7 @@ int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool acc
pRaft->selfGroupId, pRaft->selfId, id, pRaft->term);
}
int voteIndex = syncRaftConfigurationIndexOfVoter(pRaft, id);
int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id);
assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0);
assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN);
......@@ -279,8 +280,38 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
}
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
// TODO
// if receive old term message, no need to continue
if (pRaft->checkQuorum && pMsg->msgType == RAFT_MSG_APPEND) {
/**
* We have received messages from a leader at a lower term. It is possible
* that these messages were simply delayed in the network, but this could
* also mean that this node has advanced its term number during a network
* partition, and it is now unable to either win an election or to rejoin
* the majority on the old term. If checkQuorum is false, this will be
* handled by incrementing term numbers in response to MsgVote with a
* higher term, but if checkQuorum is true we may not advance the term on
* MsgVote and must generate other messages to advance the term. The net
* result of these two features is to minimize the disruption caused by
* nodes that have been removed from the cluster's configuration: a
* removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
* but it will not receive MsgApp or MsgHeartbeat, so it will not create
* disruptive term increases
**/
int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (peerIndex < 0) {
return true;
}
SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term);
if (msg == NULL) {
return true;
}
pRaft->io.send(msg, &(pRaft->cluster.nodeInfo[peerIndex]));
} else {
// ignore other cases
syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]",
pRaft->selfGroupId, pRaft->selfId, pRaft->term, pMsg->msgType, pMsg->from, pMsg->term);
}
return true;
}
......@@ -308,6 +339,9 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if (msgType == RAFT_MSG_VOTE_RESP) {
syncRaftHandleVoteRespMessage(pRaft, pMsg);
return 0;
} else if (msgType == RAFT_MSG_APPEND) {
syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from);
syncRaftHandleAppendEntriesMessage(pRaft, pMsg);
}
return 0;
}
......@@ -353,7 +387,7 @@ static int triggerAll(SSyncRaft* pRaft) {
continue;
}
syncRaftReplicate(pRaft, i);
}
}
......
......@@ -16,7 +16,7 @@
#include "raft_configuration.h"
#include "raft.h"
int syncRaftConfigurationIndexOfVoter(SSyncRaft *pRaft, SyncNodeId id) {
int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id) {
return (int)(id);
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "raft.h"
#include "raft_log.h"
#include "raft_configuration.h"
#include "raft_message.h"
int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries);
int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (peerIndex < 0) {
return 0;
}
SSyncMessage* pRespMsg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term);
if (pRespMsg == NULL) {
return 0;
}
RaftMsg_Append_Entries *appendResp = &(pMsg->appendResp);
// ignore committed logs
if (syncRaftLogIsCommitted(pRaft->log, appendEntries->index)) {
appendResp->index = pRaft->log->commitIndex;
goto out;
}
syncInfo("[%d:%d] recv append from %d index %" PRId64"",
pRaft->selfGroupId, pRaft->selfId, pMsg->from, appendEntries->index);
out:
pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[peerIndex]));
return 0;
}
\ No newline at end of file
......@@ -23,7 +23,7 @@ static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
SSyncMessage* pRespMsg;
int voteIndex = syncRaftConfigurationIndexOfVoter(pRaft, pMsg->from);
int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (voteIndex == -1) {
return 0;
}
......
......@@ -23,7 +23,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
int quorum;
int voterIndex;
voterIndex = syncRaftConfigurationIndexOfVoter(pRaft, pMsg->from);
voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (voterIndex == -1) {
syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from);
return 0;
......
......@@ -23,6 +23,10 @@ SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog) {
return 0;
}
SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog) {
return 0;
}
SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) {
return 0;
}
......@@ -46,4 +50,9 @@ SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index) {
int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize,
SSyncRaftEntry **ppEntries, int *n) {
return 0;
}
void syncRaftLogRelease(SSyncRaftLog* pLog, SyncIndex index,
SSyncRaftEntry *pEntries, int n) {
return;
}
\ No newline at end of file
......@@ -33,13 +33,62 @@ int syncRaftReplicate(SSyncRaft* pRaft, int i) {
}
SyncIndex nextIndex = syncRaftProgressNextIndex(progress);
SyncIndex prevIndex = nextIndex - 1;
SyncTerm prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex);
SyncIndex snapshotIndex = syncRaftLogSnapshotIndex(pRaft->log);
bool inSnapshot = syncRaftProgressInSnapshot(progress);
SyncIndex prevIndex;
SyncTerm prevTerm;
/**
* From Section 3.5:
*
* When sending an AppendEntries RPC, the leader includes the index and
* term of the entry in its log that immediately precedes the new
* entries. If the follower does not find an entry in its log with the
* same index and term, then it refuses the new entries. The consistency
* check acts as an induction step: the initial empty state of the logs
* satisfies the Log Matching Property, and the consistency check
* preserves the Log Matching Property whenever logs are extended. As a
* result, whenever AppendEntries returns successfully, the leader knows
* that the follower's log is identical to its own log up through the new
* entries (Log Matching Property in Figure 3.2).
**/
if (nextIndex == 1) {
/**
* We're including the very first entry, so prevIndex and prevTerm are
* null. If the first entry is not available anymore, send the last
* snapshot if we're not already sending one.
**/
if (snapshotIndex > 0 && !inSnapshot) {
goto send_snapshot;
}
// otherwise send append entries from start
prevIndex = 0;
prevTerm = 0;
} else {
/**
* Set prevIndex and prevTerm to the index and term of the entry at
* nextIndex - 1.
**/
prevIndex = nextIndex - 1;
prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex);
/**
* If the entry is not anymore in our log, send the last snapshot if we're
* not doing so already.
**/
if (prevTerm == SYNC_NON_TERM && !inSnapshot) {
goto send_snapshot;
}
}
if (prevTerm == SYNC_NON_TERM && !syncRaftProgressInSnapshot(progress)) {
goto send_snapshot;
/* Send empty AppendEntries RPC when installing a snaphot */
if (inSnapshot) {
prevIndex = syncRaftLogLastIndex(pRaft->log);
prevTerm = syncRaftLogLastTerm(pRaft->log);
}
return sendAppendEntries(pRaft, i, prevIndex, prevTerm);
send_snapshot:
if (syncRaftProgressRecentActive(progress)) {
/* Only send a snapshot when we have heard from the server */
......@@ -69,7 +118,7 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT
nEntry, entries);
if (msg == NULL) {
return 0;
goto err_release_log;
}
pRaft->io.send(msg, pNode);
......@@ -87,4 +136,8 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT
syncRaftProgressUpdateSendTick(progress, pRaft->currentTick);
return 0;
err_release_log:
syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry);
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册