From aee5ebd1ced03863c7b9a3267176f317ffe53b8b Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 5 Nov 2021 15:03:56 +0800 Subject: [PATCH] [TD-10645][raft]add raft append message handle --- source/libs/sync/inc/raft.h | 12 +-- source/libs/sync/inc/raft_log.h | 9 ++ source/libs/sync/inc/raft_message.h | 52 +++++++++-- source/libs/sync/inc/raft_progress.h | 43 +++++++-- source/libs/sync/inc/raft_replication.h | 25 ++++++ source/libs/sync/inc/sync_type.h | 15 +++- source/libs/sync/src/raft.c | 53 ++++++++--- source/libs/sync/src/raft_election.c | 2 +- .../libs/sync/src/raft_handle_vote_message.c | 7 +- .../sync/src/raft_handle_vote_resp_message.c | 4 +- source/libs/sync/src/raft_log.c | 9 ++ source/libs/sync/src/raft_progress.c | 16 +--- source/libs/sync/src/raft_replication.c | 90 +++++++++++++++++++ 13 files changed, 287 insertions(+), 50 deletions(-) create mode 100644 source/libs/sync/inc/raft_replication.h create mode 100644 source/libs/sync/src/raft_replication.c diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 2ce2dcb5de..dd3eed9e02 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -20,8 +20,6 @@ #include "sync_type.h" #include "raft_message.h" -#define SYNC_NON_NODE_ID -1 - typedef struct SSyncRaftProgress SSyncRaftProgress; typedef struct RaftLeaderState { @@ -49,7 +47,8 @@ struct SSyncRaft { // owner sync node SSyncNode* pNode; - //SSyncInfo info; + int maxMsgSize; + SSyncFSM fsm; SSyncLogStore logStore; SStateManager stateManager; @@ -74,7 +73,7 @@ struct SSyncRaft { /** * New configuration is ignored if there exists unapplied configuration. **/ - bool pendingConf; + bool hasPendingConf; SSyncCluster cluster; @@ -94,6 +93,9 @@ struct SSyncRaft { **/ uint16_t heartbeatElapsed; + // current tick count since start up + uint32_t currentTick; + // election timeout tick(random in [3:6] tick) uint16_t electionTimeoutTick; @@ -129,7 +131,7 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft); void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType); -void syncRaftTriggerReplicate(SSyncRaft* pRaft); +void syncRaftTriggerHeartbeat(SSyncRaft* pRaft); void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); bool syncRaftIsPromotable(SSyncRaft* pRaft); diff --git a/source/libs/sync/inc/raft_log.h b/source/libs/sync/inc/raft_log.h index 3545bf7ba1..41b605b0d2 100644 --- a/source/libs/sync/inc/raft_log.h +++ b/source/libs/sync/inc/raft_log.h @@ -19,6 +19,10 @@ #include "sync.h" #include "sync_type.h" +struct SSyncRaftEntry { + +}; + struct SSyncRaftLog { SyncIndex uncommittedConfigIndex; @@ -41,4 +45,9 @@ int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog); bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog); +SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index); + +int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize, + SSyncRaftEntry **ppEntries, int *n); + #endif /* _TD_LIBS_SYNC_RAFT_LOG_H */ diff --git a/source/libs/sync/inc/raft_message.h b/source/libs/sync/inc/raft_message.h index d51822f8b3..58090a31f1 100644 --- a/source/libs/sync/inc/raft_message.h +++ b/source/libs/sync/inc/raft_message.h @@ -63,12 +63,28 @@ typedef struct RaftMsg_VoteResp { SyncRaftElectionType cType; } RaftMsg_VoteResp; +typedef struct RaftMsg_Append_Entries { + // index of log entry preceeding new ones + SyncIndex prevIndex; + + // term of entry at prevIndex + SyncTerm prevTerm; + + // leader's commit index. + SyncIndex commitIndex; + + // size of the log entries array + int nEntries; + + // log entries array + SSyncRaftEntry* entries; +} RaftMsg_Append_Entries; + typedef struct SSyncMessage { RaftMessageType msgType; SyncTerm term; SyncGroupId groupId; SyncNodeId from; - SyncNodeId to; union { RaftMsgInternal_Prop propose; @@ -77,6 +93,8 @@ typedef struct SSyncMessage { RaftMsg_Vote vote; RaftMsg_VoteResp voteResp; + + RaftMsg_Append_Entries appendEntries; }; } SSyncMessage; @@ -107,7 +125,7 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo return pMsg; } -static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from, SyncNodeId to, +static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from, SyncTerm term, SyncRaftElectionType cType, SyncIndex lastIndex, SyncTerm lastTerm) { SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); @@ -117,7 +135,6 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId *pMsg = (SSyncMessage) { .groupId = groupId, .from = from, - .to = to, .term = term, .msgType = RAFT_MSG_VOTE, .vote = (RaftMsg_Vote) { @@ -130,7 +147,7 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId return pMsg; } -static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from, SyncNodeId to, +static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from, SyncRaftElectionType cType, bool rejected) { SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); if (pMsg == NULL) { @@ -139,7 +156,6 @@ static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNo *pMsg = (SSyncMessage) { .groupId = groupId, .from = from, - .to = to, .msgType = RAFT_MSG_VOTE_RESP, .voteResp = (RaftMsg_VoteResp) { .cType = cType, @@ -150,12 +166,36 @@ static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNo return pMsg; } +static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNodeId from, + SyncTerm term, SyncIndex prevIndex, SyncTerm prevTerm, + SyncIndex commitIndex, int nEntries, SSyncRaftEntry* entries) { + SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); + if (pMsg == NULL) { + return NULL; + } + *pMsg = (SSyncMessage) { + .groupId = groupId, + .from = from, + .term = term, + .msgType = RAFT_MSG_APPEND, + .appendEntries = (RaftMsg_Append_Entries) { + .prevIndex = prevIndex, + .prevTerm = prevTerm, + .commitIndex = commitIndex, + .nEntries = nEntries, + .entries = entries, + }, + }; + + return pMsg; +} + static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) { return msgType == RAFT_MSG_INTERNAL_PROP || msgType == RAFT_MSG_INTERNAL_ELECTION; } -static FORCE_INLINE bool syncIsPreVoteRespMsg(SSyncMessage* pMsg) { +static FORCE_INLINE bool syncIsPreVoteRespMsg(const SSyncMessage* pMsg) { return pMsg->msgType == RAFT_MSG_VOTE_RESP && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION; } diff --git a/source/libs/sync/inc/raft_progress.h b/source/libs/sync/inc/raft_progress.h index 159a80fa0e..5840468a5d 100644 --- a/source/libs/sync/inc/raft_progress.h +++ b/source/libs/sync/inc/raft_progress.h @@ -85,6 +85,9 @@ struct SSyncRaftProgress { **/ bool paused; + // last send append message tick + uint32_t lastSendTick; + /** * pendingSnapshotIndex is used in PROGRESS_SNAPSHOT. * If there is a pending snapshot, the pendingSnapshotIndex will be set to the @@ -116,7 +119,9 @@ int syncRaftProgressCreate(SSyncRaft* pRaft); **/ bool syncRaftProgressMaybeUpdate(SSyncRaft* pRaft, int i, SyncIndex lastIndex); -void syncRaftProgressOptimisticNextIndex(SSyncRaft* pRaft, int i, SyncIndex nextIndex); +static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { + progress->nextIndex = nextIndex + 1; +} /** * syncRaftProgressMaybeDecrTo returns false if the given to index comes from an out of order message. @@ -131,7 +136,35 @@ bool syncRaftProgressMaybeDecrTo(SSyncRaft* pRaft, int i, * MsgApps, is currently waiting for a snapshot, or has reached the * MaxInflightMsgs limit. **/ -bool syncRaftProgressIsPaused(SSyncRaft* pRaft, int i); +bool syncRaftProgressIsPaused(SSyncRaftProgress* progress); + +static FORCE_INLINE void syncRaftProgressPause(SSyncRaftProgress* progress) { + progress->paused = true; +} + +static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) { + return progress->nextIndex; +} + +static FORCE_INLINE RaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) { + return progress->state == PROGRESS_REPLICATE; +} + +static FORCE_INLINE RaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) { + return progress->state == PROGRESS_SNAPSHOT; +} + +static FORCE_INLINE RaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) { + return progress->state == PROGRESS_PROBE; +} + +static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progress) { + return progress->recentActive; +} + +static FORCE_INLINE bool syncRaftProgressUpdateSendTick(SSyncRaftProgress* progress, SyncTick current) { + return progress->lastSendTick = current; +} void syncRaftProgressFailure(SSyncRaft* pRaft, int i); @@ -159,7 +192,7 @@ void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights); void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); -SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i); + SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i); @@ -171,11 +204,9 @@ bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i); void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i); -bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i); -void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); -RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i); +void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); #endif diff --git a/source/libs/sync/inc/raft_replication.h b/source/libs/sync/inc/raft_replication.h new file mode 100644 index 0000000000..e457063980 --- /dev/null +++ b/source/libs/sync/inc/raft_replication.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_REPLICATION_H +#define TD_SYNC_RAFT_REPLICATION_H + +#include "sync.h" +#include "syncInt.h" +#include "sync_type.h" + +int syncRaftReplicate(SSyncRaft* pRaft, int i); + +#endif /* TD_SYNC_RAFT_REPLICATION_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index f9632f6ae8..130243a72a 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -16,12 +16,18 @@ #ifndef _TD_LIBS_SYNC_TYPE_H #define _TD_LIBS_SYNC_TYPE_H +#define SYNC_NON_NODE_ID -1 +#define SYNC_NON_TERM 0 + typedef int32_t SyncTime; +typedef uint32_t SyncTick; typedef struct SSyncRaft SSyncRaft; typedef struct SSyncRaftLog SSyncRaftLog; +typedef struct SSyncRaftEntry SSyncRaftEntry; + #ifndef MIN #define MIN(x, y) (((x) < (y)) ? (x) : (y)) #endif @@ -32,13 +38,18 @@ typedef struct SSyncRaftLog SSyncRaftLog; typedef enum { SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, - SYNC_RAFT_CAMPAIGN_ELECTION = 1, - SYNC_RAFT_CAMPAIGN_TRANSFER = 3, + SYNC_RAFT_CAMPAIGN_ELECTION = 1, + SYNC_RAFT_CAMPAIGN_TRANSFER = 2, } SyncRaftElectionType; typedef enum { + // the init vote resp status SYNC_RAFT_VOTE_RESP_UNKNOWN = 0, + + // grant the vote request SYNC_RAFT_VOTE_RESP_GRANT = 1, + + //reject the vote request SYNC_RAFT_VOTE_RESP_REJECT = 2, } SyncRaftVoteRespType; diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 6e8e359305..dca5c4cf08 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -29,6 +29,8 @@ static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static int triggerAll(SSyncRaft* pRaft); + static void tickElection(SSyncRaft* pRaft); static void tickHeartbeat(SSyncRaft* pRaft); @@ -95,8 +97,8 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { } int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - syncDebug("from %d, to %d, type:%d, term:%" PRId64 ", state:%d", - pMsg->from, pMsg->to, pMsg->msgType, pMsg->term, pRaft->state); + syncDebug("from %d, type:%d, term:%" PRId64 ", state:%d", + pMsg->from, pMsg->msgType, pMsg->term, pRaft->state); if (preHandleMessage(pRaft, pMsg)) { syncFreeMessage(pMsg); @@ -117,6 +119,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { } int32_t syncRaftTick(SSyncRaft* pRaft) { + pRaft->currentTick += 1; return 0; } @@ -168,12 +171,22 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) { pRaft->leaderId = pRaft->leaderId; pRaft->state = TAOS_SYNC_ROLE_LEADER; // TODO: check if there is pending config log + int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log); + if (nPendingConf > 1) { + syncFatal("unexpected multiple uncommitted config entry"); + } + if (nPendingConf == 1) { + pRaft->hasPendingConf = true; + } syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); -} -void syncRaftTriggerReplicate(SSyncRaft* pRaft) { + // after become leader, send initial heartbeat + syncRaftTriggerHeartbeat(pRaft); +} +void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) { + triggerAll(pRaft); } void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) { @@ -219,7 +232,7 @@ int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool acc } /** - * pre-handle message, return true is no need to continue + * pre-handle message, return true means no need to continue * Handle the message term, which may result in our stepping down to a follower. **/ static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { @@ -230,9 +243,11 @@ static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { if (pMsg->term > pRaft->term) { return preHandleNewTermMessage(pRaft, pMsg); + } else if (pMsg->term < pRaft->term) { + return preHandleOldTermMessage(pRaft, pMsg); } - return preHandleOldTermMessage(pRaft, pMsg);; + return false; } static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { @@ -240,6 +255,7 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) RaftMessageType msgType = pMsg->msgType; if (msgType == RAFT_MSG_VOTE) { + // TODO leaderId = SYNC_NON_NODE_ID; } @@ -263,7 +279,7 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) } static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - + // TODO // if receive old term message, no need to continue return true; } @@ -273,7 +289,7 @@ static int convertClear(SSyncRaft* pRaft) { } static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - convertClear(pRaft); + return 0; } @@ -290,6 +306,7 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { } if (msgType == RAFT_MSG_VOTE_RESP) { + syncRaftHandleVoteRespMessage(pRaft, pMsg); return 0; } return 0; @@ -324,6 +341,22 @@ static void tickHeartbeat(SSyncRaft* pRaft) { } +/** + * trigger I/O requests for newly appended log entries or heartbeats. + **/ +static int triggerAll(SSyncRaft* pRaft) { + assert(pRaft->state == TAOS_SYNC_ROLE_LEADER); + int i; + + for (i = 0; i < pRaft->cluster.replica; ++i) { + if (i == pRaft->cluster.selfIndex) { + continue; + } + + + } +} + static void abortLeaderTransfer(SSyncRaft* pRaft) { pRaft->leadTransferee = SYNC_NON_NODE_ID; } @@ -343,5 +376,5 @@ static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { abortLeaderTransfer(pRaft); - pRaft->pendingConf = false; -} \ No newline at end of file + pRaft->hasPendingConf = false; +} diff --git a/source/libs/sync/src/raft_election.c b/source/libs/sync/src/raft_election.c index bb4a7541c2..4ffb8d0943 100644 --- a/source/libs/sync/src/raft_election.c +++ b/source/libs/sync/src/raft_election.c @@ -62,7 +62,7 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) { SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId, - nodeId, term, cType, lastIndex, lastTerm); + term, cType, lastIndex, lastTerm); if (pMsg == NULL) { continue; } diff --git a/source/libs/sync/src/raft_handle_vote_message.c b/source/libs/sync/src/raft_handle_vote_message.c index a575c5df1a..87ef468d57 100644 --- a/source/libs/sync/src/raft_handle_vote_message.c +++ b/source/libs/sync/src/raft_handle_vote_message.c @@ -15,6 +15,7 @@ #include "syncInt.h" #include "raft.h" +#include "raft_configuration.h" #include "raft_log.h" #include "raft_message.h" @@ -31,12 +32,12 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); grant = canGrantVoteMessage(pRaft, pMsg); - pRespMsg = syncNewVoteRespMsg(pRaft->selfGroupId, pRaft->selfId, pMsg->to, pMsg->vote.cType, !grant); + pRespMsg = syncNewVoteRespMsg(pRaft->selfGroupId, pRaft->selfId, pMsg->vote.cType, !grant); if (pRespMsg == NULL) { return 0; } syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 ", vote: %d] %s for %d" \ - "[logterm: %" PRId64 ", index: %" PRId64 ", vote: %d] at term %" PRId64 "", + "[logterm: %" PRId64 ", index: %" PRId64 "] at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, lastTerm, lastIndex, pRaft->voteFor, grant ? "grant" : "reject", pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term); @@ -49,7 +50,7 @@ static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { if (!(pRaft->voteFor == SYNC_NON_NODE_ID || pMsg->term > pRaft->term || pRaft->voteFor == pMsg->from)) { return false; } - if (!syncRaftLogIsUptodate(pRaft, pMsg->vote.lastIndex, pMsg->vote.lastTerm)) { + if (!syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm)) { return false; } diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c index a155f0fe63..6e88b03b5a 100644 --- a/source/libs/sync/src/raft_handle_vote_resp_message.c +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -15,6 +15,7 @@ #include "syncInt.h" #include "raft.h" +#include "raft_configuration.h" #include "raft_message.h" int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { @@ -45,8 +46,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { if (pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); } else { - syncRaftBecomeLeader(pRaft); - syncRaftTriggerReplicate(pRaft); + syncRaftBecomeLeader(pRaft); } return 0; diff --git a/source/libs/sync/src/raft_log.c b/source/libs/sync/src/raft_log.c index f93595e9f3..ee51fcbef3 100644 --- a/source/libs/sync/src/raft_log.c +++ b/source/libs/sync/src/raft_log.c @@ -37,4 +37,13 @@ int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog) { bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog) { return pLog->commitIndex > pLog->appliedIndex; +} + +SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index) { + return SYNC_NON_TERM; +} + +int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize, + SSyncRaftEntry **ppEntries, int *n) { + return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/raft_progress.c b/source/libs/sync/src/raft_progress.c index 458f829394..8133b670ff 100644 --- a/source/libs/sync/src/raft_progress.c +++ b/source/libs/sync/src/raft_progress.c @@ -22,7 +22,6 @@ static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state); static void resumeProgress(SSyncRaftProgress* progress); -static void pauseProgress(SSyncRaftProgress* progress); int syncRaftProgressCreate(SSyncRaft* pRaft) { @@ -58,11 +57,6 @@ bool syncRaftProgressMaybeUpdate(SSyncRaft* pRaft, int i, SyncIndex lastIndex) { return updated; } -void syncRaftProgressOptimisticNextIndex(SSyncRaft* pRaft, int i, SyncIndex nextIndex) { - assert(i >= 0 && i < pRaft->leaderState.nProgress); - pRaft->leaderState.progress[i].nextIndex = nextIndex + 1; -} - bool syncRaftProgressMaybeDecrTo(SSyncRaft* pRaft, int i, SyncIndex rejected, SyncIndex lastIndex) { assert(i >= 0 && i < pRaft->leaderState.nProgress); @@ -103,15 +97,7 @@ static void resumeProgress(SSyncRaftProgress* progress) { progress->paused = false; } -static void pauseProgress(SSyncRaftProgress* progress) { - progress->paused = true; -} - -bool syncRaftProgressIsPaused(SSyncRaft* pRaft, int i) { - assert(i >= 0 && i < pRaft->leaderState.nProgress); - - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - +bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { switch (progress->state) { case PROGRESS_PROBE: return progress->paused; diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c new file mode 100644 index 0000000000..02d9804f7e --- /dev/null +++ b/source/libs/sync/src/raft_replication.c @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft.h" +#include "raft_log.h" +#include "raft_progress.h" +#include "raft_replication.h" + +static int sendSnapshot(SSyncRaft* pRaft, int i); +static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm term); + +int syncRaftReplicate(SSyncRaft* pRaft, int i) { + assert(pRaft->state == TAOS_SYNC_ROLE_LEADER); + assert(i >= 0 && i < pRaft->leaderState.nProgress); + + SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + if (syncRaftProgressIsPaused(progress)) { + syncInfo("node %d paused", nodeId); + return 0; + } + + SyncIndex nextIndex = syncRaftProgressNextIndex(progress); + SyncIndex prevIndex = nextIndex - 1; + SyncTerm prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex); + + if (prevTerm == SYNC_NON_TERM && !syncRaftProgressInSnapshot(progress)) { + goto send_snapshot; + } + +send_snapshot: + if (syncRaftProgressRecentActive(progress)) { + /* Only send a snapshot when we have heard from the server */ + return sendSnapshot(pRaft, i); + } else { + /* Send empty AppendEntries RPC when we haven't heard from the server */ + prevIndex = syncRaftLogLastIndex(pRaft->log); + prevTerm = syncRaftLogLastTerm(pRaft->log); + return sendAppendEntries(pRaft, i, prevIndex, prevTerm); + } +} + +static int sendSnapshot(SSyncRaft* pRaft, int i) { + return 0; +} + +static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncTerm prevTerm) { + SyncIndex nextIndex = prevIndex + 1; + SSyncRaftEntry *entries; + int nEntry; + SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[i]); + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry); + + SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term, + prevIndex, prevTerm, pRaft->log->commitIndex, + nEntry, entries); + + if (msg == NULL) { + return 0; + } + + pRaft->io.send(msg, pNode); + + if (syncRaftProgressInReplicate(progress)) { + SyncIndex lastIndex = nextIndex + nEntry; + syncRaftProgressOptimisticNextIndex(progress, lastIndex); + syncRaftInflightAdd(&progress->inflights, lastIndex); + } else if (syncRaftProgressInProbe(progress)) { + syncRaftProgressPause(progress); + } else { + + } + + syncRaftProgressUpdateSendTick(progress, pRaft->currentTick); + + return 0; +} \ No newline at end of file -- GitLab