syncAppendEntriesReply.c 5.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
18
#include "syncCommit.h"
M
Minghao Li 已提交
19
#include "syncIndexMgr.h"
20
#include "syncPipeline.h"
S
Shengliang Guan 已提交
21
#include "syncMessage.h"
22
#include "syncRaftEntry.h"
M
Minghao Li 已提交
23
#include "syncRaftStore.h"
M
Minghao Li 已提交
24
#include "syncReplication.h"
25
#include "syncSnapshot.h"
M
Minghao Li 已提交
26
#include "syncUtil.h"
M
Minghao Li 已提交
27

M
Minghao Li 已提交
28 29 30 31 32 33 34 35 36 37 38 39
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
//    /\ m.mterm = currentTerm[i]
//    /\ \/ /\ m.msuccess \* successful
//          /\ nextIndex'  = [nextIndex  EXCEPT ![i][j] = m.mmatchIndex + 1]
//          /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
//       \/ /\ \lnot m.msuccess \* not successful
//          /\ nextIndex' = [nextIndex EXCEPT ![i][j] =
//                               Max({nextIndex[i][j] - 1, 1})]
//          /\ UNCHANGED <<matchIndex>>
//    /\ Discard(m)
//    /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
M
Minghao Li 已提交
40
//
M
Minghao Li 已提交
41

42 43
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncAppendEntriesReply* pMsg = pRpcMsg->pCont;
M
Minghao Li 已提交
44 45 46
  int32_t ret = 0;

  // if already drop replica, do not process
M
Minghao Li 已提交
47 48 49
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
    syncLogRecvAppendEntriesReply(ths, pMsg, "not in my config");
    return 0;
M
Minghao Li 已提交
50 51 52 53 54 55 56 57
  }

  // drop stale response
  if (pMsg->term < ths->pRaftStore->currentTerm) {
    syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
    return 0;
  }

B
Benguang Zhao 已提交
58 59 60 61 62 63 64 65 66
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term > ths->pRaftStore->currentTerm) {
      syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
      syncNodeStepDown(ths, pMsg->term);
      return -1;
    }

    ASSERT(pMsg->term == ths->pRaftStore->currentTerm);

67 68
    sTrace("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ",  term:%" PRId64 ", matchIndex:%" PRId64 "",
           pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
B
Benguang Zhao 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81

    if (pMsg->success) {
      SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
      if (pMsg->matchIndex > oldMatchIndex) {
        syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
      }

      // commit if needed
      SyncIndex indexLikely = TMIN(pMsg->matchIndex, ths->pLogBuf->matchIndex);
      SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
      (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex);
    }

B
Benguang Zhao 已提交
82 83
    // replicate log
    SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
84
    // ASSERT(pMgr != NULL);
B
Benguang Zhao 已提交
85 86
    if (pMgr != NULL) {
      (void)syncLogReplMgrProcessReply(pMgr, ths, pMsg);
B
Benguang Zhao 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
    }
  }
  return 0;
}

int32_t syncNodeOnAppendEntriesReplyOld(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
  int32_t ret = 0;

  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
    syncLogRecvAppendEntriesReply(ths, pMsg, "not in my config");
    return 0;
  }

  // drop stale response
  if (pMsg->term < ths->pRaftStore->currentTerm) {
    syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
    return 0;
  }

M
Minghao Li 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pMsg->term > ths->pRaftStore->currentTerm) {
      syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
      syncNodeStepDown(ths, pMsg->term);
      return -1;
    }

    ASSERT(pMsg->term == ths->pRaftStore->currentTerm);

    if (pMsg->success) {
      SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
      if (pMsg->matchIndex > oldMatchIndex) {
        syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
        syncMaybeAdvanceCommitIndex(ths);
      }
      syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);

    } else {
      SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
      if (nextIndex > SYNC_INDEX_BEGIN) {
        --nextIndex;
      }
      syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
    }

    // send next append entries
    SPeerState* pState = syncNodeGetPeerState(ths, &(pMsg->srcId));
    ASSERT(pState != NULL);

    if (pMsg->lastSendIndex == pState->lastSendIndex) {
M
Minghao Li 已提交
137 138 139 140
      int64_t timeNow = taosGetTimestampMs();
      int64_t elapsed = timeNow - pState->lastSendTime;
      sNTrace(ths, "sync-append-entries rtt elapsed:%" PRId64 ", index:%" PRId64, elapsed, pState->lastSendIndex);

M
Minghao Li 已提交
141
      syncNodeReplicateOne(ths, &(pMsg->srcId), true);
M
Minghao Li 已提交
142 143 144
    }
  }

M
Minghao Li 已提交
145
  syncLogRecvAppendEntriesReply(ths, pMsg, "process");
M
Minghao Li 已提交
146
  return 0;
B
Benguang Zhao 已提交
147
}