/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #ifndef _TD_LIBS_SYNC_APPEND_ENTRIES_H #define _TD_LIBS_SYNC_APPEND_ENTRIES_H #ifdef __cplusplus extern "C" { #endif #include #include #include #include "syncInt.h" #include "syncMessage.h" #include "taosdef.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == // LET logOk == \/ m.mprevLogIndex = 0 // \/ /\ m.mprevLogIndex > 0 // /\ m.mprevLogIndex <= Len(log[i]) // /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term // IN /\ m.mterm <= currentTerm[i] // /\ \/ /\ \* reject request // \/ m.mterm < currentTerm[i] // \/ /\ m.mterm = currentTerm[i] // /\ state[i] = Follower // /\ \lnot logOk // /\ Reply([mtype |-> AppendEntriesResponse, // mterm |-> currentTerm[i], // msuccess |-> FALSE, // mmatchIndex |-> 0, // msource |-> i, // mdest |-> j], // m) // /\ UNCHANGED <> // \/ \* return to follower state // /\ m.mterm = currentTerm[i] // /\ state[i] = Candidate // /\ state' = [state EXCEPT ![i] = Follower] // /\ UNCHANGED <> // \/ \* accept request // /\ m.mterm = currentTerm[i] // /\ state[i] = Follower // /\ logOk // /\ LET index == m.mprevLogIndex + 1 // IN \/ \* already done with request // /\ \/ m.mentries = << >> // \/ /\ m.mentries /= << >> // /\ Len(log[i]) >= index // /\ log[i][index].term = m.mentries[1].term // \* This could make our commitIndex decrease (for // \* example if we process an old, duplicated request), // \* but that doesn't really affect anything. // /\ commitIndex' = [commitIndex EXCEPT ![i] = // m.mcommitIndex] // /\ Reply([mtype |-> AppendEntriesResponse, // mterm |-> currentTerm[i], // msuccess |-> TRUE, // mmatchIndex |-> m.mprevLogIndex + // Len(m.mentries), // msource |-> i, // mdest |-> j], // m) // /\ UNCHANGED <> // \/ \* conflict: remove 1 entry // /\ m.mentries /= << >> // /\ Len(log[i]) >= index // /\ log[i][index].term /= m.mentries[1].term // /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> // log[i][index2]] // IN log' = [log EXCEPT ![i] = new] // /\ UNCHANGED <> // \/ \* no conflict: append entry // /\ m.mentries /= << >> // /\ Len(log[i]) = m.mprevLogIndex // /\ log' = [log EXCEPT ![i] = // Append(log[i], m.mentries[1])] // /\ UNCHANGED <> // /\ UNCHANGED <> // int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg); #ifdef __cplusplus } #endif #endif /*_TD_LIBS_SYNC_APPEND_ENTRIES_H*/