/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "syncAppendEntriesReply.h" #include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" #include "syncVoteMgr.h" // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == // /\ m.mterm = currentTerm[i] // /\ \/ /\ m.msuccess \* successful // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] // /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] // \/ /\ \lnot m.msuccess \* not successful // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = // Max({nextIndex[i][j] - 1, 1})] // /\ UNCHANGED <> // /\ Discard(m) // /\ UNCHANGED <> // int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg); if (pMsg->term < ths->pRaftStore->currentTerm) { sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm); return ret; } // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); // } assert(pMsg->term == ths->pRaftStore->currentTerm); if (pMsg->success) { // nextIndex = reply.matchIndex + 1 syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); // matchIndex = reply.matchIndex syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); // maybe commit syncNodeMaybeAdvanceCommitIndex(ths); } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); if (nextIndex > SYNC_INDEX_BEGIN) { --nextIndex; } else { nextIndex = SYNC_INDEX_BEGIN; } syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); } return ret; }