/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "syncAppendEntriesReply.h" #include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == // /\ m.mterm = currentTerm[i] // /\ \/ /\ m.msuccess \* successful // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] // /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] // \/ /\ \lnot m.msuccess \* not successful // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = // Max({nextIndex[i][j] - 1, 1})] // /\ UNCHANGED <> // /\ Discard(m) // /\ UNCHANGED <> // int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; // print log syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg); // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); return 0; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%lu, drop stale response", pMsg->term); syncNodeEventLog(ths, logBuf); return 0; } if (gRaftDetailLog) { syncNodeEventLog(ths, "recv sync-append-entries-reply, before"); } syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex); syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex); // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); // } if (pMsg->term > ths->pRaftStore->currentTerm) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term); syncNodeErrorLog(ths, logBuf); return -1; } ASSERT(pMsg->term == ths->pRaftStore->currentTerm); if (pMsg->success) { // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); // maybe commit syncMaybeAdvanceCommitIndex(ths); } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); // notice! int64, uint64 if (nextIndex > SYNC_INDEX_BEGIN) { --nextIndex; } else { nextIndex = SYNC_INDEX_BEGIN; } syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); } if (gRaftDetailLog) { syncNodeEventLog(ths, "recv sync-append-entries-reply, after"); } syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex); syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex); return ret; } // only start once static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, SyncAppendEntriesReply* pMsg) { // get sender SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); ASSERT(pSender != NULL); if (snapshotSenderIsStart(pSender)) { do { char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender already start"); syncNodeErrorLog(ths, eventLog); taosMemoryFree(eventLog); } while (0); return; } SSnapshot snapshot = { .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; void* pReader = NULL; SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex}; int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); ASSERT(code == 0); if (pMsg->privateTerm < pSender->privateTerm) { ASSERT(pReader != NULL); snapshotSenderStart(pSender, readerParam, snapshot, pReader); } else { if (pReader != NULL) { ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); } } } int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%lu, drop stale response", pMsg->term); syncNodeEventLog(ths, logBuf); return -1; } // error term if (pMsg->term > ths->pRaftStore->currentTerm) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term); syncNodeErrorLog(ths, logBuf); return -1; } ASSERT(pMsg->term == ths->pRaftStore->currentTerm); if (pMsg->success) { SyncIndex newNextIndex = pMsg->matchIndex + 1; SyncIndex newMatchIndex = pMsg->matchIndex; bool needStartSnapshot = false; if (newMatchIndex >= SYNC_INDEX_BEGIN && !ths->pLogStore->syncLogExist(ths->pLogStore, newMatchIndex)) { needStartSnapshot = true; } if (!needStartSnapshot) { // update next-index, match-index syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); // maybe commit if (ths->state == TAOS_SYNC_STATE_LEADER) { syncMaybeAdvanceCommitIndex(ths); } } else { // start snapshot SSnapshot oldSnapshot; ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot); ASSERT(oldSnapshot.lastApplyIndex >= newMatchIndex + 1); syncNodeStartSnapshotOnce(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, pMsg); // term maybe not ok? syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); } // event log, update next-index do { char host[64]; int16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "reset next-index:%ld, match-index:%ld for %s:%d", newNextIndex, newMatchIndex, host, port); syncNodeEventLog(ths, logBuf); } while (0); } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); if (nextIndex > SYNC_INDEX_BEGIN) { --nextIndex; bool needStartSnapshot = false; if (nextIndex >= SYNC_INDEX_BEGIN && !ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex)) { needStartSnapshot = true; } if (nextIndex - 1 >= SYNC_INDEX_BEGIN && !ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex - 1)) { needStartSnapshot = true; } if (!needStartSnapshot) { // do nothing } else { SSyncRaftEntry* pEntry; int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry); ASSERT(code == 0); syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg); // get sender SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); ASSERT(pSender != NULL); SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; // update nextIndex to sentryIndex if (nextIndex <= sentryIndex) { nextIndex = sentryIndex; } } } else { nextIndex = SYNC_INDEX_BEGIN; } syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); // event log, update next-index do { char host[64]; int16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); SyncIndex newNextIndex = nextIndex; SyncIndex newMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "reset2 next-index:%ld, match-index:%ld for %s:%d", newNextIndex, newMatchIndex, host, port); syncNodeEventLog(ths, logBuf); } while (0); } return 0; } int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; // print log syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplySnapshotCb==", pMsg); // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped"); return 0; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%lu, drop stale response", pMsg->term); syncNodeEventLog(ths, logBuf); return 0; } if (gRaftDetailLog) { syncNodeEventLog(ths, "recv sync-append-entries-reply, before"); } syncIndexMgrLog2("recv sync-append-entries-reply, before pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv sync-append-entries-reply, before pMatchIndex:", ths->pMatchIndex); // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); // } if (pMsg->term > ths->pRaftStore->currentTerm) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term); syncNodeErrorLog(ths, logBuf); return -1; } ASSERT(pMsg->term == ths->pRaftStore->currentTerm); if (pMsg->success) { // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); if (gRaftDetailLog) { sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success); } // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); // maybe commit if (ths->state == TAOS_SYNC_STATE_LEADER) { syncMaybeAdvanceCommitIndex(ths); } } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); if (gRaftDetailLog) { sTrace("update next index not match, begin, index:%ld, success:%d", nextIndex, pMsg->success); } // notice! int64, uint64 if (nextIndex > SYNC_INDEX_BEGIN) { --nextIndex; // get sender SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); ASSERT(pSender != NULL); SSnapshot snapshot = {.data = NULL, .lastApplyIndex = SYNC_INDEX_INVALID, .lastApplyTerm = 0, .lastConfigIndex = SYNC_INDEX_INVALID}; void* pReader = NULL; ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader); if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { // has snapshot ASSERT(pReader != NULL); SSnapshotParam readerParam = {.start = 0, .end = snapshot.lastApplyIndex}; snapshotSenderStart(pSender, readerParam, snapshot, pReader); } else { // no snapshot if (pReader != NULL) { ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); } } SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; // update nextIndex to sentryIndex if (nextIndex <= sentryIndex) { nextIndex = sentryIndex; } } else { nextIndex = SYNC_INDEX_BEGIN; } syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); if (gRaftDetailLog) { sTrace("update next index not match, end, index:%ld, success:%d", nextIndex, pMsg->success); } } if (gRaftDetailLog) { syncNodeEventLog(ths, "recv sync-append-entries-reply, after"); } syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex); return 0; }