syncAppendEntriesReply.c 8.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
17
#include "syncCommit.h"
M
Minghao Li 已提交
18 19
#include "syncIndexMgr.h"
#include "syncInt.h"
M
Minghao Li 已提交
20
#include "syncRaftCfg.h"
M
Minghao Li 已提交
21 22
#include "syncRaftLog.h"
#include "syncRaftStore.h"
23
#include "syncSnapshot.h"
M
Minghao Li 已提交
24 25
#include "syncUtil.h"
#include "syncVoteMgr.h"
M
Minghao Li 已提交
26

M
Minghao Li 已提交
27 28 29 30 31 32 33 34 35 36 37 38
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
//    /\ m.mterm = currentTerm[i]
//    /\ \/ /\ m.msuccess \* successful
//          /\ nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
//          /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
//       \/ /\ \lnot m.msuccess \* not successful
//          /\ nextIndex' = [nextIndex EXCEPT ![i][j] =
//                               Max({nextIndex[i][j] - 1, 1})]
//          /\ UNCHANGED <<matchIndex>>
//    /\ Discard(m)
//    /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
M
Minghao Li 已提交
39
//
M
Minghao Li 已提交
40 41
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
42

43
  char logBuf[128] = {0};
M
Minghao Li 已提交
44 45
  snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplyCb== term:%lu", ths->pRaftStore->currentTerm);
  syncAppendEntriesReplyLog2(logBuf, pMsg);
M
Minghao Li 已提交
46 47

  if (pMsg->term < ths->pRaftStore->currentTerm) {
M
Minghao Li 已提交
48 49
    sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
           ths->pRaftStore->currentTerm);
M
Minghao Li 已提交
50 51 52
    return ret;
  }

M
Minghao Li 已提交
53 54 55
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex);
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex);

M
Minghao Li 已提交
56 57 58 59 60
  // no need this code, because if I receive reply.term, then I must have sent for that term.
  //  if (pMsg->term > ths->pRaftStore->currentTerm) {
  //    syncNodeUpdateTerm(ths, pMsg->term);
  //  }

M
Minghao Li 已提交
61
  if (pMsg->term > ths->pRaftStore->currentTerm) {
62
    char logBuf[128] = {0};
M
Minghao Li 已提交
63 64 65 66 67 68 69
    snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplyCb error term, receive:%lu current:%lu", pMsg->term,
             ths->pRaftStore->currentTerm);
    syncNodeLog2(logBuf, ths);
    sError("%s", logBuf);
    return ret;
  }

M
Minghao Li 已提交
70
  ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
M
Minghao Li 已提交
71 72

  if (pMsg->success) {
M
Minghao Li 已提交
73
    // nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
M
Minghao Li 已提交
74 75
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);

M
Minghao Li 已提交
76
    // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
M
Minghao Li 已提交
77 78 79
    syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);

    // maybe commit
M
Minghao Li 已提交
80
    syncMaybeAdvanceCommitIndex(ths);
M
Minghao Li 已提交
81 82 83

  } else {
    SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
M
Minghao Li 已提交
84 85

    // notice! int64, uint64
M
Minghao Li 已提交
86 87 88 89 90 91 92 93
    if (nextIndex > SYNC_INDEX_BEGIN) {
      --nextIndex;
    } else {
      nextIndex = SYNC_INDEX_BEGIN;
    }
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
  }

M
Minghao Li 已提交
94 95 96
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);

M
Minghao Li 已提交
97 98
  return ret;
}
M
Minghao Li 已提交
99

100 101 102
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
  int32_t ret = 0;

103
  // print log
104
  syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplySnapshotCb==", pMsg);
105

106
  // if already drop replica, do not process
M
Minghao Li 已提交
107
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
108 109
    syncNodeEventLog(ths, "recv sync-append-entries-reply,  maybe replica already dropped");
    return 0;
110 111
  }

112
  // drop stale response
113
  if (pMsg->term < ths->pRaftStore->currentTerm) {
114 115 116 117
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%lu, drop stale response", pMsg->term);
    syncNodeEventLog(ths, logBuf);
    return 0;
118 119
  }

120
  if (gRaftDetailLog) {
121
    syncNodeEventLog(ths, "recv sync-append-entries-reply, before");
M
Minghao Li 已提交
122
  }
123 124
  syncIndexMgrLog2("recv sync-append-entries-reply, before pNextIndex:", ths->pNextIndex);
  syncIndexMgrLog2("recv sync-append-entries-reply, before pMatchIndex:", ths->pMatchIndex);
125 126 127 128 129 130 131

  // no need this code, because if I receive reply.term, then I must have sent for that term.
  //  if (pMsg->term > ths->pRaftStore->currentTerm) {
  //    syncNodeUpdateTerm(ths, pMsg->term);
  //  }

  if (pMsg->term > ths->pRaftStore->currentTerm) {
132 133 134 135
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term);
    syncNodeErrorLog(ths, logBuf);
    return -1;
136 137
  }

138
  ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
139 140 141 142

  if (pMsg->success) {
    // nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
143 144 145 146

    if (gRaftDetailLog) {
      sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success);
    }
147 148 149 150 151

    // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
    syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);

    // maybe commit
152 153 154
    if (ths->state == TAOS_SYNC_STATE_LEADER) {
      syncMaybeAdvanceCommitIndex(ths);
    }
155 156 157

  } else {
    SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
158 159 160
    if (gRaftDetailLog) {
      sTrace("update next index not match, begin, index:%ld, success:%d", nextIndex, pMsg->success);
    }
161 162 163 164

    // notice! int64, uint64
    if (nextIndex > SYNC_INDEX_BEGIN) {
      --nextIndex;
165

166 167 168 169
      // get sender
      SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
      ASSERT(pSender != NULL);

M
Minghao Li 已提交
170 171 172 173 174 175 176 177
      SSnapshot snapshot;
      void*     pReader = NULL;
      ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader);
      if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 &&
          !snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
        // has snapshot
        ASSERT(pReader != NULL);
        snapshotSenderStart(pSender, snapshot, pReader);
178

M
Minghao Li 已提交
179 180 181
        char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
        syncNodeEventLog(ths, eventLog);
        taosMemoryFree(eventLog);
M
Minghao Li 已提交
182 183 184 185 186 187

      } else {
        // no snapshot
        if (pReader != NULL) {
          ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader);
        }
188 189
      }

M
Minghao Li 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
      /*
            bool      hasSnapshot = syncNodeHasSnapshot(ths);
            SSnapshot snapshot;
            ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);

            // start sending snapshot first time
            // start here, stop by receiver
            if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) &&
                pMsg->privateTerm < pSender->privateTerm) {
              snapshotSenderStart(pSender);

              char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
              syncNodeEventLog(ths, eventLog);
              taosMemoryFree(eventLog);
            }
      */

207 208 209 210 211
      SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;

      // update nextIndex to sentryIndex
      if (nextIndex <= sentryIndex) {
        nextIndex = sentryIndex;
212 213
      }

214 215 216
    } else {
      nextIndex = SYNC_INDEX_BEGIN;
    }
M
Minghao Li 已提交
217

218
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
219 220 221
    if (gRaftDetailLog) {
      sTrace("update next index not match, end, index:%ld, success:%d", nextIndex, pMsg->success);
    }
222 223
  }

224 225 226 227 228
  if (gRaftDetailLog) {
    syncNodeEventLog(ths, "recv sync-append-entries-reply, after");
  }
  syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
  syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
229 230 231

  return ret;
}