diff --git a/source/libs/sync/inc/raft_configuration.h b/source/libs/sync/inc/raft_configuration.h index 993f863f330a7f63733900849b91f3f6dee722a5..ac9bbb5e55d89f0f1c4952dad15c1c900828b631 100644 --- a/source/libs/sync/inc/raft_configuration.h +++ b/source/libs/sync/inc/raft_configuration.h @@ -20,7 +20,7 @@ #include "sync_type.h" // return -1 if cannot find this id -int syncRaftConfigurationIndexOfVoter(SSyncRaft *pRaft, SyncNodeId id); +int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id); int syncRaftConfigurationVoterCount(SSyncRaft *pRaft); diff --git a/source/libs/sync/inc/raft_log.h b/source/libs/sync/inc/raft_log.h index 41b605b0d24cdb06c93ce0fd1a8fbfdc2eb10c2f..bab9932fb537cdee0321bc75a231ab807d1acb28 100644 --- a/source/libs/sync/inc/raft_log.h +++ b/source/libs/sync/inc/raft_log.h @@ -37,6 +37,8 @@ SSyncRaftLog* syncRaftLogOpen(); SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog); +SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog); + SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog); bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term); @@ -50,4 +52,13 @@ SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index); int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize, SSyncRaftEntry **ppEntries, int *n); +void syncRaftLogRelease(SSyncRaftLog* pLog, SyncIndex index, + SSyncRaftEntry *pEntries, int n); + +bool syncRaftLogMatchTerm(); + +static FORCE_INLINE bool syncRaftLogIsCommitted(SSyncRaftLog* pLog, SyncIndex index) { + return pLog->commitIndex > index; +} + #endif /* _TD_LIBS_SYNC_RAFT_LOG_H */ diff --git a/source/libs/sync/inc/raft_message.h b/source/libs/sync/inc/raft_message.h index 58090a31f12f755332faa7060fb7c16c061de3e1..2cb625d1fbd8e866a692098bb54befe29cef1f37 100644 --- a/source/libs/sync/inc/raft_message.h +++ b/source/libs/sync/inc/raft_message.h @@ -65,10 +65,10 @@ typedef struct RaftMsg_VoteResp { typedef struct RaftMsg_Append_Entries { // index of log entry preceeding new ones - SyncIndex prevIndex; + SyncIndex index; // term of entry at prevIndex - SyncTerm prevTerm; + SyncTerm term; // leader's commit index. SyncIndex commitIndex; @@ -80,6 +80,10 @@ typedef struct RaftMsg_Append_Entries { SSyncRaftEntry* entries; } RaftMsg_Append_Entries; +typedef struct RaftMsg_Append_Resp { + SyncIndex index; +} RaftMsg_Append_Resp; + typedef struct SSyncMessage { RaftMessageType msgType; SyncTerm term; @@ -95,6 +99,7 @@ typedef struct SSyncMessage { RaftMsg_VoteResp voteResp; RaftMsg_Append_Entries appendEntries; + RaftMsg_Append_Resp appendResp; }; } SSyncMessage; @@ -167,7 +172,7 @@ static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNo } static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNodeId from, - SyncTerm term, SyncIndex prevIndex, SyncTerm prevTerm, + SyncTerm term, SyncIndex logIndex, SyncTerm logTerm, SyncIndex commitIndex, int nEntries, SSyncRaftEntry* entries) { SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); if (pMsg == NULL) { @@ -179,8 +184,8 @@ static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNode .term = term, .msgType = RAFT_MSG_APPEND, .appendEntries = (RaftMsg_Append_Entries) { - .prevIndex = prevIndex, - .prevTerm = prevTerm, + .index = logIndex, + .term = logTerm, .commitIndex = commitIndex, .nEntries = nEntries, .entries = entries, @@ -190,6 +195,24 @@ static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNode return pMsg; } +static FORCE_INLINE SSyncMessage* syncNewEmptyAppendRespMsg(SyncGroupId groupId, SyncNodeId from, SyncTerm term) { + SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); + if (pMsg == NULL) { + return NULL; + } + *pMsg = (SSyncMessage) { + .groupId = groupId, + .from = from, + .term = term, + .msgType = RAFT_MSG_APPEND_RESP, + .appendResp = (RaftMsg_Append_Resp) { + + }, + }; + + return pMsg; +} + static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) { return msgType == RAFT_MSG_INTERNAL_PROP || msgType == RAFT_MSG_INTERNAL_ELECTION; @@ -209,5 +232,6 @@ void syncFreeMessage(const SSyncMessage* pMsg); int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); #endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */ \ No newline at end of file diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index dca5c4cf0887712d4aa3c4e5c413e660d8e943a6..39e7a80d0bbed6268abde1822277422984ce389e 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -16,6 +16,7 @@ #include "raft.h" #include "raft_configuration.h" #include "raft_log.h" +#include "raft_replication.h" #include "syncInt.h" #define RAFT_READ_LOG_MAX_NUM 100 @@ -215,7 +216,7 @@ int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool acc pRaft->selfGroupId, pRaft->selfId, id, pRaft->term); } - int voteIndex = syncRaftConfigurationIndexOfVoter(pRaft, id); + int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id); assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0); assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN); @@ -279,8 +280,38 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) } static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - // TODO - // if receive old term message, no need to continue + if (pRaft->checkQuorum && pMsg->msgType == RAFT_MSG_APPEND) { + /** + * We have received messages from a leader at a lower term. It is possible + * that these messages were simply delayed in the network, but this could + * also mean that this node has advanced its term number during a network + * partition, and it is now unable to either win an election or to rejoin + * the majority on the old term. If checkQuorum is false, this will be + * handled by incrementing term numbers in response to MsgVote with a + * higher term, but if checkQuorum is true we may not advance the term on + * MsgVote and must generate other messages to advance the term. The net + * result of these two features is to minimize the disruption caused by + * nodes that have been removed from the cluster's configuration: a + * removed node will send MsgVotes (or MsgPreVotes) which will be ignored, + * but it will not receive MsgApp or MsgHeartbeat, so it will not create + * disruptive term increases + **/ + int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); + if (peerIndex < 0) { + return true; + } + SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term); + if (msg == NULL) { + return true; + } + + pRaft->io.send(msg, &(pRaft->cluster.nodeInfo[peerIndex])); + } else { + // ignore other cases + syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]", + pRaft->selfGroupId, pRaft->selfId, pRaft->term, pMsg->msgType, pMsg->from, pMsg->term); + } + return true; } @@ -308,6 +339,9 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { if (msgType == RAFT_MSG_VOTE_RESP) { syncRaftHandleVoteRespMessage(pRaft, pMsg); return 0; + } else if (msgType == RAFT_MSG_APPEND) { + syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from); + syncRaftHandleAppendEntriesMessage(pRaft, pMsg); } return 0; } @@ -353,7 +387,7 @@ static int triggerAll(SSyncRaft* pRaft) { continue; } - + syncRaftReplicate(pRaft, i); } } diff --git a/source/libs/sync/src/raft_configuration.c b/source/libs/sync/src/raft_configuration.c index 6f3a27e7c0aacb98bd6cacace7a8a6635c5ea7e3..e16cb349892a78fef6ac58f1fb3d0bc9ecd66064 100644 --- a/source/libs/sync/src/raft_configuration.c +++ b/source/libs/sync/src/raft_configuration.c @@ -16,7 +16,7 @@ #include "raft_configuration.h" #include "raft.h" -int syncRaftConfigurationIndexOfVoter(SSyncRaft *pRaft, SyncNodeId id) { +int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id) { return (int)(id); } diff --git a/source/libs/sync/src/raft_handle_append_entries_message.c b/source/libs/sync/src/raft_handle_append_entries_message.c new file mode 100644 index 0000000000000000000000000000000000000000..d4d362848f2b3eb513fd8175162fb3df6db7d07b --- /dev/null +++ b/source/libs/sync/src/raft_handle_append_entries_message.c @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "raft.h" +#include "raft_log.h" +#include "raft_configuration.h" +#include "raft_message.h" + +int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries); + + int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); + + if (peerIndex < 0) { + return 0; + } + + SSyncMessage* pRespMsg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term); + if (pRespMsg == NULL) { + return 0; + } + + RaftMsg_Append_Entries *appendResp = &(pMsg->appendResp); + // ignore committed logs + if (syncRaftLogIsCommitted(pRaft->log, appendEntries->index)) { + appendResp->index = pRaft->log->commitIndex; + goto out; + } + + syncInfo("[%d:%d] recv append from %d index %" PRId64"", + pRaft->selfGroupId, pRaft->selfId, pMsg->from, appendEntries->index); + +out: + pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[peerIndex])); + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_vote_message.c b/source/libs/sync/src/raft_handle_vote_message.c index 87ef468d57ddbb69e3ce9ebc3a6aa31c4a3844e0..2fab8ad5a987068653dfa574ed8f840a6d4cc885 100644 --- a/source/libs/sync/src/raft_handle_vote_message.c +++ b/source/libs/sync/src/raft_handle_vote_message.c @@ -23,7 +23,7 @@ static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { SSyncMessage* pRespMsg; - int voteIndex = syncRaftConfigurationIndexOfVoter(pRaft, pMsg->from); + int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); if (voteIndex == -1) { return 0; } diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c index 6e88b03b5a5f430a4d849095f23415b1a42ccbd7..05464256af0089c45c6f6827b4c199d2722f4e50 100644 --- a/source/libs/sync/src/raft_handle_vote_resp_message.c +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -23,7 +23,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { int quorum; int voterIndex; - voterIndex = syncRaftConfigurationIndexOfVoter(pRaft, pMsg->from); + voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); if (voterIndex == -1) { syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from); return 0; diff --git a/source/libs/sync/src/raft_log.c b/source/libs/sync/src/raft_log.c index ee51fcbef3c31e87cf2bba55836e2ab9973a6168..a26650cbb75abc312b146abfea25cdef925035d2 100644 --- a/source/libs/sync/src/raft_log.c +++ b/source/libs/sync/src/raft_log.c @@ -23,6 +23,10 @@ SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog) { return 0; } +SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog) { + return 0; +} + SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) { return 0; } @@ -46,4 +50,9 @@ SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index) { int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize, SSyncRaftEntry **ppEntries, int *n) { return 0; +} + +void syncRaftLogRelease(SSyncRaftLog* pLog, SyncIndex index, + SSyncRaftEntry *pEntries, int n) { + return; } \ No newline at end of file diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index 02d9804f7e47fc0891b0151eb6d52b546ac7b856..b6ff1fb329f80140280073e12a24d68f72fe1cf1 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -33,13 +33,62 @@ int syncRaftReplicate(SSyncRaft* pRaft, int i) { } SyncIndex nextIndex = syncRaftProgressNextIndex(progress); - SyncIndex prevIndex = nextIndex - 1; - SyncTerm prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex); + SyncIndex snapshotIndex = syncRaftLogSnapshotIndex(pRaft->log); + bool inSnapshot = syncRaftProgressInSnapshot(progress); + SyncIndex prevIndex; + SyncTerm prevTerm; + + /** + * From Section 3.5: + * + * When sending an AppendEntries RPC, the leader includes the index and + * term of the entry in its log that immediately precedes the new + * entries. If the follower does not find an entry in its log with the + * same index and term, then it refuses the new entries. The consistency + * check acts as an induction step: the initial empty state of the logs + * satisfies the Log Matching Property, and the consistency check + * preserves the Log Matching Property whenever logs are extended. As a + * result, whenever AppendEntries returns successfully, the leader knows + * that the follower's log is identical to its own log up through the new + * entries (Log Matching Property in Figure 3.2). + **/ + if (nextIndex == 1) { + /** + * We're including the very first entry, so prevIndex and prevTerm are + * null. If the first entry is not available anymore, send the last + * snapshot if we're not already sending one. + **/ + if (snapshotIndex > 0 && !inSnapshot) { + goto send_snapshot; + } + + // otherwise send append entries from start + prevIndex = 0; + prevTerm = 0; + } else { + /** + * Set prevIndex and prevTerm to the index and term of the entry at + * nextIndex - 1. + **/ + prevIndex = nextIndex - 1; + prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex); + /** + * If the entry is not anymore in our log, send the last snapshot if we're + * not doing so already. + **/ + if (prevTerm == SYNC_NON_TERM && !inSnapshot) { + goto send_snapshot; + } + } - if (prevTerm == SYNC_NON_TERM && !syncRaftProgressInSnapshot(progress)) { - goto send_snapshot; + /* Send empty AppendEntries RPC when installing a snaphot */ + if (inSnapshot) { + prevIndex = syncRaftLogLastIndex(pRaft->log); + prevTerm = syncRaftLogLastTerm(pRaft->log); } + return sendAppendEntries(pRaft, i, prevIndex, prevTerm); + send_snapshot: if (syncRaftProgressRecentActive(progress)) { /* Only send a snapshot when we have heard from the server */ @@ -69,7 +118,7 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT nEntry, entries); if (msg == NULL) { - return 0; + goto err_release_log; } pRaft->io.send(msg, pNode); @@ -87,4 +136,8 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT syncRaftProgressUpdateSendTick(progress, pRaft->currentTick); return 0; + +err_release_log: + syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry); + return 0; } \ No newline at end of file