syncAppendEntries.c 14.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncAppendEntries.h"
M
Minghao Li 已提交
17
#include "syncInt.h"
M
Minghao Li 已提交
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19 20 21 22
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
M
Minghao Li 已提交
23

M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
//    LET logOk == \/ m.mprevLogIndex = 0
//                 \/ /\ m.mprevLogIndex > 0
//                    /\ m.mprevLogIndex <= Len(log[i])
//                    /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
//    IN /\ m.mterm <= currentTerm[i]
//       /\ \/ /\ \* reject request
//                \/ m.mterm < currentTerm[i]
//                \/ /\ m.mterm = currentTerm[i]
//                   /\ state[i] = Follower
//                   /\ \lnot logOk
//             /\ Reply([mtype           |-> AppendEntriesResponse,
//                       mterm           |-> currentTerm[i],
//                       msuccess        |-> FALSE,
//                       mmatchIndex     |-> 0,
//                       msource         |-> i,
//                       mdest           |-> j],
//                       m)
//             /\ UNCHANGED <<serverVars, logVars>>
//          \/ \* return to follower state
//             /\ m.mterm = currentTerm[i]
//             /\ state[i] = Candidate
//             /\ state' = [state EXCEPT ![i] = Follower]
//             /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
//          \/ \* accept request
//             /\ m.mterm = currentTerm[i]
//             /\ state[i] = Follower
//             /\ logOk
//             /\ LET index == m.mprevLogIndex + 1
//                IN \/ \* already done with request
//                       /\ \/ m.mentries = << >>
//                          \/ /\ m.mentries /= << >>
//                             /\ Len(log[i]) >= index
//                             /\ log[i][index].term = m.mentries[1].term
//                          \* This could make our commitIndex decrease (for
//                          \* example if we process an old, duplicated request),
//                          \* but that doesn't really affect anything.
//                       /\ commitIndex' = [commitIndex EXCEPT ![i] =
//                                              m.mcommitIndex]
//                       /\ Reply([mtype           |-> AppendEntriesResponse,
//                                 mterm           |-> currentTerm[i],
//                                 msuccess        |-> TRUE,
//                                 mmatchIndex     |-> m.mprevLogIndex +
//                                                     Len(m.mentries),
//                                 msource         |-> i,
//                                 mdest           |-> j],
//                                 m)
//                       /\ UNCHANGED <<serverVars, log>>
//                   \/ \* conflict: remove 1 entry
//                       /\ m.mentries /= << >>
//                       /\ Len(log[i]) >= index
//                       /\ log[i][index].term /= m.mentries[1].term
//                       /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
//                                          log[i][index2]]
//                          IN log' = [log EXCEPT ![i] = new]
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
//                   \/ \* no conflict: append entry
//                       /\ m.mentries /= << >>
//                       /\ Len(log[i]) = m.mprevLogIndex
//                       /\ log' = [log EXCEPT ![i] =
//                                      Append(log[i], m.mentries[1])]
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
//       /\ UNCHANGED <<candidateVars, leaderVars>>
//
M
Minghao Li 已提交
89 90
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
91 92 93 94

  char logBuf[128];
  snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
  syncAppendEntriesLog2(logBuf, pMsg);
M
Minghao Li 已提交
95

M
Minghao Li 已提交
96 97 98 99 100
  if (pMsg->term > ths->pRaftStore->currentTerm) {
    syncNodeUpdateTerm(ths, pMsg->term);
  }
  assert(pMsg->term <= ths->pRaftStore->currentTerm);

M
Minghao Li 已提交
101
  // reset elect timer
M
Minghao Li 已提交
102 103 104 105 106 107 108
  if (pMsg->term == ths->pRaftStore->currentTerm) {
    ths->leaderCache = pMsg->srcId;
    syncNodeResetElectTimer(ths);
  }
  assert(pMsg->dataLen >= 0);

  SyncTerm localPreLogTerm = 0;
M
Minghao Li 已提交
109
  if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
M
Minghao Li 已提交
110
    SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
M
Minghao Li 已提交
111 112 113 114 115 116 117 118
    assert(pEntry != NULL);
    localPreLogTerm = pEntry->term;
    syncEntryDestory(pEntry);
  }

  bool logOK =
      (pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
      ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
M
Minghao Li 已提交
119
       (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
M
Minghao Li 已提交
120

M
Minghao Li 已提交
121
  // reject request
M
Minghao Li 已提交
122 123
  if ((pMsg->term < ths->pRaftStore->currentTerm) ||
      ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
M
Minghao Li 已提交
124 125 126 127 128
    sTrace(
        "syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
        "logOK:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);

M
Minghao Li 已提交
129
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
M
Minghao Li 已提交
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = false;
    pReply->matchIndex = SYNC_INDEX_INVALID;

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);

    return ret;
  }

  // return to follower state
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
M
Minghao Li 已提交
146 147 148 149 150
    sTrace(
        "syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
        "ths->state:%d, logOK:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);

M
Minghao Li 已提交
151
    syncNodeBecomeFollower(ths);
M
Minghao Li 已提交
152

M
Minghao Li 已提交
153
    // ret or reply?
M
Minghao Li 已提交
154
    return ret;
M
Minghao Li 已提交
155 156 157 158
  }

  // accept request
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
M
Minghao Li 已提交
159 160
    // preIndex = -1, or has preIndex entry in local log
    assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
M
Minghao Li 已提交
161

M
Minghao Li 已提交
162 163
    // has extra entries (> preIndex) in local log
    bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
M
Minghao Li 已提交
164

M
Minghao Li 已提交
165 166
    // has entries in SyncAppendEntries msg
    bool hasAppendEntries = pMsg->dataLen > 0;
M
Minghao Li 已提交
167

M
Minghao Li 已提交
168 169 170 171
    sTrace(
        "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
        "logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
M
Minghao Li 已提交
172

M
Minghao Li 已提交
173 174 175
    if (hasExtraEntries && hasAppendEntries) {
      // not conflict by default
      bool conflict = false;
M
Minghao Li 已提交
176

M
Minghao Li 已提交
177 178 179
      SyncIndex       extraIndex = pMsg->prevLogIndex + 1;
      SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex);
      assert(pExtraEntry != NULL);
M
Minghao Li 已提交
180

M
Minghao Li 已提交
181 182
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
      assert(pAppendEntry != NULL);
M
Minghao Li 已提交
183

M
Minghao Li 已提交
184 185 186 187 188
      // log not match, conflict
      assert(extraIndex == pAppendEntry->index);
      if (pExtraEntry->term != pAppendEntry->term) {
        conflict = true;
      }
M
Minghao Li 已提交
189

M
Minghao Li 已提交
190 191 192 193
      if (conflict) {
        // roll back
        SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
        SyncIndex delEnd = extraIndex;
M
Minghao Li 已提交
194

M
Minghao Li 已提交
195
        sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
M
Minghao Li 已提交
196

M
Minghao Li 已提交
197 198 199 200 201
        // notice! reverse roll back!
        for (SyncIndex index = delEnd; index >= delBegin; --index) {
          if (ths->pFsm->FpRollBackCb != NULL) {
            SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
            assert(pRollBackEntry != NULL);
M
Minghao Li 已提交
202

M
Minghao Li 已提交
203
            // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
204
            if (syncUtilUserRollback(pRollBackEntry->msgType)) {
M
Minghao Li 已提交
205 206 207 208 209 210 211 212 213 214 215 216
              SRpcMsg rpcMsg;
              syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);

              SFsmCbMeta cbMeta;
              cbMeta.index = pRollBackEntry->index;
              cbMeta.isWeak = pRollBackEntry->isWeak;
              cbMeta.code = 0;
              cbMeta.state = ths->state;
              cbMeta.seqNum = pRollBackEntry->seqNum;
              ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
              rpcFreeCont(rpcMsg.pCont);
            }
M
Minghao Li 已提交
217

M
Minghao Li 已提交
218
            syncEntryDestory(pRollBackEntry);
M
Minghao Li 已提交
219 220 221
          }
        }

M
Minghao Li 已提交
222 223
        // delete confict entries
        ths->pLogStore->truncate(ths->pLogStore, extraIndex);
M
Minghao Li 已提交
224 225 226 227 228 229 230 231

        // append new entries
        ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);

        // pre commit
        SRpcMsg rpcMsg;
        syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
        if (ths->pFsm != NULL) {
M
Minghao Li 已提交
232
          // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
233
          if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
M
Minghao Li 已提交
234 235 236 237 238 239 240
            SFsmCbMeta cbMeta;
            cbMeta.index = pAppendEntry->index;
            cbMeta.isWeak = pAppendEntry->isWeak;
            cbMeta.code = 2;
            cbMeta.state = ths->state;
            cbMeta.seqNum = pAppendEntry->seqNum;
            ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
241 242 243
          }
        }
        rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
244
      }
M
Minghao Li 已提交
245

M
Minghao Li 已提交
246 247 248
      // free memory
      syncEntryDestory(pExtraEntry);
      syncEntryDestory(pAppendEntry);
M
Minghao Li 已提交
249

M
Minghao Li 已提交
250 251
    } else if (hasExtraEntries && !hasAppendEntries) {
      // do nothing
M
Minghao Li 已提交
252

M
Minghao Li 已提交
253 254 255
    } else if (!hasExtraEntries && hasAppendEntries) {
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
      assert(pAppendEntry != NULL);
M
Minghao Li 已提交
256

M
Minghao Li 已提交
257 258
      // append new entries
      ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
M
Minghao Li 已提交
259

M
Minghao Li 已提交
260 261 262 263
      // pre commit
      SRpcMsg rpcMsg;
      syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
      if (ths->pFsm != NULL) {
M
Minghao Li 已提交
264
        // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
265
        if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
M
Minghao Li 已提交
266 267 268 269 270 271 272
          SFsmCbMeta cbMeta;
          cbMeta.index = pAppendEntry->index;
          cbMeta.isWeak = pAppendEntry->isWeak;
          cbMeta.code = 3;
          cbMeta.state = ths->state;
          cbMeta.seqNum = pAppendEntry->seqNum;
          ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
273
        }
M
Minghao Li 已提交
274
      }
M
Minghao Li 已提交
275
      rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
276

M
Minghao Li 已提交
277 278 279 280 281
      // free memory
      syncEntryDestory(pAppendEntry);

    } else if (!hasExtraEntries && !hasAppendEntries) {
      // do nothing
M
Minghao Li 已提交
282

M
Minghao Li 已提交
283 284
    } else {
      assert(0);
M
Minghao Li 已提交
285
    }
M
Minghao Li 已提交
286

M
Minghao Li 已提交
287
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
M
Minghao Li 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = true;

    if (hasAppendEntries) {
      pReply->matchIndex = pMsg->prevLogIndex + 1;
    } else {
      pReply->matchIndex = pMsg->prevLogIndex;
    }

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);
M
Minghao Li 已提交
303

M
Minghao Li 已提交
304
    // maybe update commit index from leader
M
Minghao Li 已提交
305
    if (pMsg->commitIndex > ths->commitIndex) {
M
Minghao Li 已提交
306
      // has commit entry in local
M
Minghao Li 已提交
307
      if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
M
Minghao Li 已提交
308 309 310 311
        SyncIndex beginIndex = ths->commitIndex + 1;
        SyncIndex endIndex = pMsg->commitIndex;

        // update commit index
M
Minghao Li 已提交
312
        ths->commitIndex = pMsg->commitIndex;
M
Minghao Li 已提交
313 314

        // call back Wal
M
Minghao Li 已提交
315
        ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
M
Minghao Li 已提交
316 317 318 319 320 321 322 323 324 325 326

        // execute fsm
        if (ths->pFsm != NULL) {
          for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
            if (i != SYNC_INDEX_INVALID) {
              SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
              assert(pEntry != NULL);

              SRpcMsg rpcMsg;
              syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
327
              // if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
328
              if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
329 330 331 332 333 334
                SFsmCbMeta cbMeta;
                cbMeta.index = pEntry->index;
                cbMeta.isWeak = pEntry->isWeak;
                cbMeta.code = 0;
                cbMeta.state = ths->state;
                cbMeta.seqNum = pEntry->seqNum;
335 336
                cbMeta.term = pEntry->term;
                cbMeta.currentTerm = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
337
                ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
338 339
              }

M
Minghao Li 已提交
340 341 342
              // config change
              if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
                SSyncCfg newSyncCfg;
M
Minghao Li 已提交
343
                int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
M
Minghao Li 已提交
344 345 346
                ASSERT(ret == 0);

                syncNodeUpdateConfig(ths, &newSyncCfg);
M
Minghao Li 已提交
347 348 349 350 351
                if (ths->state == TAOS_SYNC_STATE_LEADER) {
                  syncNodeBecomeLeader(ths);
                } else {
                  syncNodeBecomeFollower(ths);
                }
M
Minghao Li 已提交
352 353
              }

M
Minghao Li 已提交
354 355 356 357 358
              rpcFreeCont(rpcMsg.pCont);
              syncEntryDestory(pEntry);
            }
          }
        }
M
Minghao Li 已提交
359 360 361 362
      }
    }
  }

M
Minghao Li 已提交
363 364
  return ret;
}