syncAppendEntriesReply.c 8.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
17
#include "syncCommit.h"
M
Minghao Li 已提交
18 19
#include "syncIndexMgr.h"
#include "syncInt.h"
M
Minghao Li 已提交
20
#include "syncRaftCfg.h"
M
Minghao Li 已提交
21 22
#include "syncRaftLog.h"
#include "syncRaftStore.h"
23
#include "syncSnapshot.h"
M
Minghao Li 已提交
24 25
#include "syncUtil.h"
#include "syncVoteMgr.h"
M
Minghao Li 已提交
26

M
Minghao Li 已提交
27 28 29 30 31 32 33 34 35 36 37 38
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
//    /\ m.mterm = currentTerm[i]
//    /\ \/ /\ m.msuccess \* successful
//          /\ nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
//          /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
//       \/ /\ \lnot m.msuccess \* not successful
//          /\ nextIndex' = [nextIndex EXCEPT ![i][j] =
//                               Max({nextIndex[i][j] - 1, 1})]
//          /\ UNCHANGED <<matchIndex>>
//    /\ Discard(m)
//    /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
M
Minghao Li 已提交
39
//
M
Minghao Li 已提交
40 41
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
42

43
  char logBuf[128] = {0};
M
Minghao Li 已提交
44 45
  snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplyCb== term:%lu", ths->pRaftStore->currentTerm);
  syncAppendEntriesReplyLog2(logBuf, pMsg);
M
Minghao Li 已提交
46 47

  if (pMsg->term < ths->pRaftStore->currentTerm) {
M
Minghao Li 已提交
48 49
    sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
           ths->pRaftStore->currentTerm);
M
Minghao Li 已提交
50 51 52
    return ret;
  }

M
Minghao Li 已提交
53 54 55
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex);
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex);

M
Minghao Li 已提交
56 57 58 59 60
  // no need this code, because if I receive reply.term, then I must have sent for that term.
  //  if (pMsg->term > ths->pRaftStore->currentTerm) {
  //    syncNodeUpdateTerm(ths, pMsg->term);
  //  }

M
Minghao Li 已提交
61
  if (pMsg->term > ths->pRaftStore->currentTerm) {
62
    char logBuf[128] = {0};
M
Minghao Li 已提交
63 64 65 66 67 68 69
    snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplyCb error term, receive:%lu current:%lu", pMsg->term,
             ths->pRaftStore->currentTerm);
    syncNodeLog2(logBuf, ths);
    sError("%s", logBuf);
    return ret;
  }

M
Minghao Li 已提交
70 71 72
  assert(pMsg->term == ths->pRaftStore->currentTerm);

  if (pMsg->success) {
M
Minghao Li 已提交
73
    // nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
M
Minghao Li 已提交
74 75
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);

M
Minghao Li 已提交
76
    // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
M
Minghao Li 已提交
77 78 79
    syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);

    // maybe commit
M
Minghao Li 已提交
80
    syncMaybeAdvanceCommitIndex(ths);
M
Minghao Li 已提交
81 82 83

  } else {
    SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
M
Minghao Li 已提交
84 85

    // notice! int64, uint64
M
Minghao Li 已提交
86 87 88 89 90 91 92 93
    if (nextIndex > SYNC_INDEX_BEGIN) {
      --nextIndex;
    } else {
      nextIndex = SYNC_INDEX_BEGIN;
    }
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
  }

M
Minghao Li 已提交
94 95 96
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
  syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);

M
Minghao Li 已提交
97 98
  return ret;
}
M
Minghao Li 已提交
99

100 101 102
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
  int32_t ret = 0;

103
  // print log
104
  char logBuf[128] = {0};
105 106
  snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, vgId:%d, term:%lu", ths->vgId,
           ths->pRaftStore->currentTerm);
107 108
  syncAppendEntriesReplyLog2(logBuf, pMsg);

109
  // if already drop replica, do not process
M
Minghao Li 已提交
110
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
M
Minghao Li 已提交
111
    sInfo("recv SyncAppendEntriesReply,  maybe replica already dropped");
112 113 114
    return ret;
  }

115
  // drop stale response
116
  if (pMsg->term < ths->pRaftStore->currentTerm) {
117
    sTrace("recv SyncAppendEntriesReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term,
118 119 120 121
           ths->pRaftStore->currentTerm);
    return ret;
  }

122
  syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex);
M
Minghao Li 已提交
123
  syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
124
  if (gRaftDetailLog) {
M
Minghao Li 已提交
125 126 127 128 129
    SSnapshot snapshot;
    ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
    sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
           snapshot.lastApplyIndex, snapshot.lastApplyTerm);
  }
130 131 132 133 134 135 136 137

  // no need this code, because if I receive reply.term, then I must have sent for that term.
  //  if (pMsg->term > ths->pRaftStore->currentTerm) {
  //    syncNodeUpdateTerm(ths, pMsg->term);
  //  }

  if (pMsg->term > ths->pRaftStore->currentTerm) {
    char logBuf[128] = {0};
138
    snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, error term, receive_term:%lu current_term:%lu",
139 140 141 142 143 144
             pMsg->term, ths->pRaftStore->currentTerm);
    syncNodeLog2(logBuf, ths);
    sError("%s", logBuf);
    return ret;
  }

145
  ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
146 147 148 149

  if (pMsg->success) {
    // nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
150 151 152 153

    if (gRaftDetailLog) {
      sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success);
    }
154 155 156 157 158

    // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
    syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);

    // maybe commit
159 160 161
    if (ths->state == TAOS_SYNC_STATE_LEADER) {
      syncMaybeAdvanceCommitIndex(ths);
    }
162 163 164

  } else {
    SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
165 166 167
    if (gRaftDetailLog) {
      sTrace("update next index not match, begin, index:%ld, success:%d", nextIndex, pMsg->success);
    }
168 169 170 171

    // notice! int64, uint64
    if (nextIndex > SYNC_INDEX_BEGIN) {
      --nextIndex;
172

173 174 175 176 177 178 179 180 181 182 183 184 185
      // get sender
      SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
      ASSERT(pSender != NULL);
      bool      hasSnapshot = syncNodeHasSnapshot(ths);
      SSnapshot snapshot;
      ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);

      // start sending snapshot first time
      // start here, stop by receiver
      if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) &&
          pMsg->privateTerm < pSender->privateTerm) {
        snapshotSenderStart(pSender);

M
Minghao Li 已提交
186 187 188 189
        char     host[128];
        uint16_t port;
        syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);

190 191
        if (gRaftDetailLog) {
          char* s = snapshotSender2Str(pSender);
M
Minghao Li 已提交
192
          sDebug(
193 194
              "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
              "lastApplyTerm:%lu "
M
Minghao Li 已提交
195
              "lastConfigIndex:%ld privateTerm:%lu "
196
              "sender:%s",
197 198
              ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex,
              pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, s);
199 200
          taosMemoryFree(s);
        } else {
M
Minghao Li 已提交
201
          sDebug(
202
              "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
M
Minghao Li 已提交
203
              "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu",
204 205
              ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex,
              pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm);
206
        }
207 208 209 210 211 212 213
      }

      SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;

      // update nextIndex to sentryIndex
      if (nextIndex <= sentryIndex) {
        nextIndex = sentryIndex;
214 215
      }

216 217 218
    } else {
      nextIndex = SYNC_INDEX_BEGIN;
    }
M
Minghao Li 已提交
219

220
    syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
221 222 223
    if (gRaftDetailLog) {
      sTrace("update next index not match, end, index:%ld, success:%d", nextIndex, pMsg->success);
    }
224 225
  }

226 227
  syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
  syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
228
  if (gRaftDetailLog) {
M
Minghao Li 已提交
229 230 231 232 233
    SSnapshot snapshot;
    ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
    sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
           snapshot.lastApplyIndex, snapshot.lastApplyTerm);
  }
234 235 236

  return ret;
}