/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "raft.h" #include "raft_log.h" #include "sync_raft_progress.h" #include "sync_raft_progress_tracker.h" #include "sync.h" #include "syncInt.h" static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state); static void probeAcked(SSyncRaftProgress* progress); static void resumeProgress(SSyncRaftProgress* progress); void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) { SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflight); if (inflights == NULL) { return; } *progress = (SSyncRaftProgress) { .matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0, .nextIndex = syncRaftLogLastIndex(pRaft->log) + 1, .inflights = inflights, .isLearner = false, .state = PROGRESS_STATE_PROBE, }; } /** * syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the * index acked by it. The method returns false if the given n index comes from * an outdated message. Otherwise it updates the progress and returns true. **/ bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) { bool updated = false; if (progress->matchIndex < lastIndex) { progress->matchIndex = lastIndex; updated = true; probeAcked(progress); } progress->nextIndex = MAX(progress->nextIndex, lastIndex + 1); return updated; } bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, SyncIndex rejected, SyncIndex matchHint) { if (progress->state == PROGRESS_STATE_REPLICATE) { /** * the rejection must be stale if the progress has matched and "rejected" * is smaller than "match". **/ if (rejected <= progress->matchIndex) { syncDebug("match index is up to date,ignore"); return false; } /* directly decrease next to match + 1 */ progress->nextIndex = progress->matchIndex + 1; return true; } /** * The rejection must be stale if "rejected" does not match next - 1. This * is because non-replicating followers are probed one entry at a time. **/ if (rejected != progress->nextIndex - 1) { syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore" , rejected, progress->nextIndex); return false; } progress->nextIndex = MAX(MIN(rejected, matchHint + 1), 1); progress->probeSent = false; return true; } /** * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. * This is done when a node has rejected recent MsgApps, is currently waiting * for a snapshot, or has reached the MaxInflightMsgs limit. In normal * operation, this is false. A throttled node will be contacted less frequently * until it has reached a state in which it's able to accept a steady stream of * log entries again. **/ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { switch (progress->state) { case PROGRESS_STATE_PROBE: return progress->probeSent; case PROGRESS_STATE_REPLICATE: return syncRaftInflightFull(progress->inflights); case PROGRESS_STATE_SNAPSHOT: return true; default: syncFatal("error sync state:%d", progress->state); } } bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; } /** * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, * optionally and if larger, the index of the pending snapshot. **/ void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { /** * If the original state is ProgressStateSnapshot, progress knows that * the pending snapshot has been sent to this peer successfully, then * probes from pendingSnapshot + 1. **/ if (progress->state == PROGRESS_STATE_SNAPSHOT) { SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; resetProgressState(progress, PROGRESS_STATE_PROBE); progress->nextIndex = MAX(progress->matchIndex + 1, pendingSnapshotIndex + 1); } else { resetProgressState(progress, PROGRESS_STATE_PROBE); progress->nextIndex = progress->matchIndex + 1; } } /** * syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1. **/ void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) { resetProgressState(progress, PROGRESS_STATE_REPLICATE); progress->nextIndex = progress->matchIndex + 1; } void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) { resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); progress->pendingSnapshotIndex = snapshotIndex; } /** * ResetState moves the Progress into the specified State, resetting ProbeSent, * PendingSnapshot, and Inflights. **/ static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) { progress->probeSent = false; progress->pendingSnapshotIndex = 0; progress->state = state; syncRaftInflightReset(progress->inflights); } /** * probeAcked is called when this peer has accepted an append. It resets * ProbeSent to signal that additional append messages should be sent without * further delay. **/ static void probeAcked(SSyncRaftProgress* progress) { progress->probeSent = false; } #if 0 SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i) { return pRaft->leaderState.progress[i].nextIndex; } SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i) { return pRaft->leaderState.progress[i].matchIndex; } void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i) { pRaft->leaderState.progress[i].lastSend = pRaft->io.time(pRaft); } void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i) { pRaft->leaderState.progress[i].lastSendSnapshot = pRaft->io.time(pRaft); } bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i) { SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); bool prev = progress->recentRecv; progress->recentRecv = false; return prev; } void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i) { pRaft->leaderState.progress[i].recentRecv = true; } bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) { return pRaft->leaderState.progress[i].recentRecv; } void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) { SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log); } void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); if (progress->state == PROGRESS_STATE_SNAPSHOT) { assert(progress->pendingSnapshotIndex > 0); SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; resetProgressState(progress, PROGRESS_STATE_PROBE); progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex); } else { resetProgressState(progress, PROGRESS_STATE_PROBE); progress->nextIndex = progress->matchIndex + 1; } } void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) { resetProgressState(pRaft->leaderState.progress, PROGRESS_STATE_REPLICATE); pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1; } void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) { SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); progress->pendingSnapshotIndex = 0; progress->state = PROGRESS_STATE_PROBE; } RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) { return pRaft->leaderState.progress[i].state; } #endif