syncAppendEntriesReply.c 7.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
17
#include "syncCommit.h"
M
Minghao Li 已提交
18 19 20 21
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
22
#include "syncSnapshot.h"
M
Minghao Li 已提交
23 24
#include "syncUtil.h"
#include "syncVoteMgr.h"
M
Minghao Li 已提交
25

M
Minghao Li 已提交
26 27 28 29 30 31 32 33 34 35 36 37
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
//    /\ m.mterm = currentTerm[i]
//    /\ \/ /\ m.msuccess \* successful
//          /\ nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
//          /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
//       \/ /\ \lnot m.msuccess \* not successful
//          /\ nextIndex' = [nextIndex EXCEPT ![i][j] =
//                               Max({nextIndex[i][j] - 1, 1})]
//          /\ UNCHANGED <<matchIndex>>
//    /\ Discard(m)
//    /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
M
Minghao Li 已提交
38
//
M
Minghao Li 已提交
39 40
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
41

42
  char logBuf[128] = {0};
M
Minghao Li 已提交
43 44
  snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplyCb== term:%lu", ths->pRaftStore->currentTerm);
  syncAppendEntriesReplyLog2(logBuf, pMsg);
M
Minghao Li 已提交
45 46

  if (pMsg->term < ths->pRaftStore->currentTerm) {
M
Minghao Li 已提交
47 48
    sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
           ths->pRaftStore->currentTerm);
M
Minghao Li 已提交
49 50 51
    return ret;
  }

M
Minghao Li 已提交
52 53 54
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex);
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex);

M
Minghao Li 已提交
55 56 57 58 59
  // no need this code, because if I receive reply.term, then I must have sent for that term.
  //  if (pMsg->term > ths->pRaftStore->currentTerm) {
  //    syncNodeUpdateTerm(ths, pMsg->term);
  //  }

M
Minghao Li 已提交
60
  if (pMsg->term > ths->pRaftStore->currentTerm) {
61
    char logBuf[128] = {0};
M
Minghao Li 已提交
62 63 64 65 66 67 68
    snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplyCb error term, receive:%lu current:%lu", pMsg->term,
             ths->pRaftStore->currentTerm);
    syncNodeLog2(logBuf, ths);
    sError("%s", logBuf);
    return ret;
  }

M
Minghao Li 已提交
69 70 71
  assert(pMsg->term == ths->pRaftStore->currentTerm);

  if (pMsg->success) {
M
Minghao Li 已提交
72
    // nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
M
Minghao Li 已提交
73 74
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);

M
Minghao Li 已提交
75
    // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
M
Minghao Li 已提交
76 77 78
    syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);

    // maybe commit
M
Minghao Li 已提交
79
    syncMaybeAdvanceCommitIndex(ths);
M
Minghao Li 已提交
80 81 82

  } else {
    SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
M
Minghao Li 已提交
83 84

    // notice! int64, uint64
M
Minghao Li 已提交
85 86 87 88 89 90 91 92
    if (nextIndex > SYNC_INDEX_BEGIN) {
      --nextIndex;
    } else {
      nextIndex = SYNC_INDEX_BEGIN;
    }
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
  }

M
Minghao Li 已提交
93 94 95
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);

M
Minghao Li 已提交
96 97
  return ret;
}
M
Minghao Li 已提交
98

99 100 101
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
  int32_t ret = 0;

102
  // print log
103
  char logBuf[128] = {0};
104
  snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, term:%lu", ths->pRaftStore->currentTerm);
105 106
  syncAppendEntriesReplyLog2(logBuf, pMsg);

107 108 109 110 111 112
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
    sInfo("maybe already dropped");
    return ret;
  }

113
  // drop stale response
114
  if (pMsg->term < ths->pRaftStore->currentTerm) {
115
    sTrace("recv SyncAppendEntriesReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term,
116 117 118 119
           ths->pRaftStore->currentTerm);
    return ret;
  }

120
  syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex);
M
Minghao Li 已提交
121 122 123 124 125 126 127
  syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
  {
    SSnapshot snapshot;
    ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
    sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
           snapshot.lastApplyIndex, snapshot.lastApplyTerm);
  }
128 129 130 131 132 133 134 135

  // no need this code, because if I receive reply.term, then I must have sent for that term.
  //  if (pMsg->term > ths->pRaftStore->currentTerm) {
  //    syncNodeUpdateTerm(ths, pMsg->term);
  //  }

  if (pMsg->term > ths->pRaftStore->currentTerm) {
    char logBuf[128] = {0};
136
    snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, error term, receive_term:%lu current_term:%lu",
137 138 139 140 141 142
             pMsg->term, ths->pRaftStore->currentTerm);
    syncNodeLog2(logBuf, ths);
    sError("%s", logBuf);
    return ret;
  }

143
  ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
144 145 146 147

  if (pMsg->success) {
    // nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
M
Minghao Li 已提交
148
    sTrace("update next index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success);
149 150 151 152 153

    // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
    syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);

    // maybe commit
154 155 156
    if (ths->state == TAOS_SYNC_STATE_LEADER) {
      syncMaybeAdvanceCommitIndex(ths);
    }
157 158 159

  } else {
    SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
M
Minghao Li 已提交
160
    sTrace("begin to update next index:%ld, success:%d", nextIndex, pMsg->success);
161 162 163 164

    // notice! int64, uint64
    if (nextIndex > SYNC_INDEX_BEGIN) {
      --nextIndex;
165 166 167

      // has snapshot
      if (syncNodeHasSnapshot(ths)) {
168 169 170 171 172 173 174 175
        // get sender
        SSyncSnapshotSender* pSender = NULL;
        for (int i = 0; i < ths->replicaNum; ++i) {
          if (syncUtilSameId(&(pMsg->srcId), &((ths->replicasId)[i]))) {
            pSender = (ths->senders)[i];
          }
        }
        ASSERT(pSender != NULL);
176

177
        SyncIndex sentryIndex;
178 179
        if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) {
          // already start
180
          sentryIndex = pSender->snapshot.lastApplyIndex;
181 182
          sTrace("sending snapshot already start: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu", pSender->term,
                 ths->pRaftStore->currentTerm);
183

184
        } else {
185
          // start send snapshot, first time
186 187 188
          sTrace("sending snapshot start first: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu", pSender->term,
                 ths->pRaftStore->currentTerm);

189 190
          snapshotSenderDoStart(pSender);
          pSender->start = true;
191 192 193 194 195 196
          sentryIndex = pSender->snapshot.lastApplyIndex;
        }

        // update nextIndex to sentryIndex + 1
        if (nextIndex <= sentryIndex) {
          nextIndex = sentryIndex + 1;
197 198 199
        }
      }

200 201 202
    } else {
      nextIndex = SYNC_INDEX_BEGIN;
    }
M
Minghao Li 已提交
203

204
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
M
Minghao Li 已提交
205
    sTrace("update next index:%ld, success:%d", nextIndex, pMsg->success);
206 207
  }

208 209
  syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
  syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
M
Minghao Li 已提交
210 211 212 213 214 215
  {
    SSnapshot snapshot;
    ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
    sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
           snapshot.lastApplyIndex, snapshot.lastApplyTerm);
  }
216 217 218

  return ret;
}