syncAppendEntries.c 24.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncAppendEntries.h"
M
Minghao Li 已提交
17
#include "syncInt.h"
M
Minghao Li 已提交
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19 20 21 22
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
M
Minghao Li 已提交
23
#include "wal.h"
M
Minghao Li 已提交
24

M
Minghao Li 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
//    LET logOk == \/ m.mprevLogIndex = 0
//                 \/ /\ m.mprevLogIndex > 0
//                    /\ m.mprevLogIndex <= Len(log[i])
//                    /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
//    IN /\ m.mterm <= currentTerm[i]
//       /\ \/ /\ \* reject request
//                \/ m.mterm < currentTerm[i]
//                \/ /\ m.mterm = currentTerm[i]
//                   /\ state[i] = Follower
//                   /\ \lnot logOk
//             /\ Reply([mtype           |-> AppendEntriesResponse,
//                       mterm           |-> currentTerm[i],
//                       msuccess        |-> FALSE,
//                       mmatchIndex     |-> 0,
//                       msource         |-> i,
//                       mdest           |-> j],
//                       m)
//             /\ UNCHANGED <<serverVars, logVars>>
//          \/ \* return to follower state
//             /\ m.mterm = currentTerm[i]
//             /\ state[i] = Candidate
//             /\ state' = [state EXCEPT ![i] = Follower]
//             /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
//          \/ \* accept request
//             /\ m.mterm = currentTerm[i]
//             /\ state[i] = Follower
//             /\ logOk
//             /\ LET index == m.mprevLogIndex + 1
//                IN \/ \* already done with request
//                       /\ \/ m.mentries = << >>
//                          \/ /\ m.mentries /= << >>
//                             /\ Len(log[i]) >= index
//                             /\ log[i][index].term = m.mentries[1].term
//                          \* This could make our commitIndex decrease (for
//                          \* example if we process an old, duplicated request),
//                          \* but that doesn't really affect anything.
//                       /\ commitIndex' = [commitIndex EXCEPT ![i] =
//                                              m.mcommitIndex]
//                       /\ Reply([mtype           |-> AppendEntriesResponse,
//                                 mterm           |-> currentTerm[i],
//                                 msuccess        |-> TRUE,
//                                 mmatchIndex     |-> m.mprevLogIndex +
//                                                     Len(m.mentries),
//                                 msource         |-> i,
//                                 mdest           |-> j],
//                                 m)
//                       /\ UNCHANGED <<serverVars, log>>
//                   \/ \* conflict: remove 1 entry
//                       /\ m.mentries /= << >>
//                       /\ Len(log[i]) >= index
//                       /\ log[i][index].term /= m.mentries[1].term
//                       /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
//                                          log[i][index2]]
//                          IN log' = [log EXCEPT ![i] = new]
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
//                   \/ \* no conflict: append entry
//                       /\ m.mentries /= << >>
//                       /\ Len(log[i]) = m.mprevLogIndex
//                       /\ log' = [log EXCEPT ![i] =
//                                      Append(log[i], m.mentries[1])]
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
//       /\ UNCHANGED <<candidateVars, leaderVars>>
//
M
Minghao Li 已提交
90 91
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
92

93
  char logBuf[128] = {0};
M
Minghao Li 已提交
94 95
  snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
  syncAppendEntriesLog2(logBuf, pMsg);
M
Minghao Li 已提交
96

M
Minghao Li 已提交
97 98 99 100 101
  if (pMsg->term > ths->pRaftStore->currentTerm) {
    syncNodeUpdateTerm(ths, pMsg->term);
  }
  assert(pMsg->term <= ths->pRaftStore->currentTerm);

M
Minghao Li 已提交
102
  // reset elect timer
M
Minghao Li 已提交
103 104 105 106 107 108 109
  if (pMsg->term == ths->pRaftStore->currentTerm) {
    ths->leaderCache = pMsg->srcId;
    syncNodeResetElectTimer(ths);
  }
  assert(pMsg->dataLen >= 0);

  SyncTerm localPreLogTerm = 0;
M
Minghao Li 已提交
110
  if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
111
    SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
M
Minghao Li 已提交
112 113 114 115 116 117 118 119
    assert(pEntry != NULL);
    localPreLogTerm = pEntry->term;
    syncEntryDestory(pEntry);
  }

  bool logOK =
      (pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
      ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
M
Minghao Li 已提交
120
       (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
M
Minghao Li 已提交
121

M
Minghao Li 已提交
122
  // reject request
M
Minghao Li 已提交
123 124
  if ((pMsg->term < ths->pRaftStore->currentTerm) ||
      ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
M
Minghao Li 已提交
125 126 127 128 129
    sTrace(
        "syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
        "logOK:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);

M
Minghao Li 已提交
130
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
M
Minghao Li 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = false;
    pReply->matchIndex = SYNC_INDEX_INVALID;

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);

    return ret;
  }

  // return to follower state
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
M
Minghao Li 已提交
147 148 149 150 151
    sTrace(
        "syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
        "ths->state:%d, logOK:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);

M
Minghao Li 已提交
152
    syncNodeBecomeFollower(ths);
M
Minghao Li 已提交
153

M
Minghao Li 已提交
154
    // ret or reply?
M
Minghao Li 已提交
155
    return ret;
M
Minghao Li 已提交
156 157 158 159
  }

  // accept request
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
M
Minghao Li 已提交
160 161
    // preIndex = -1, or has preIndex entry in local log
    assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
M
Minghao Li 已提交
162

M
Minghao Li 已提交
163 164
    // has extra entries (> preIndex) in local log
    bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
M
Minghao Li 已提交
165

M
Minghao Li 已提交
166 167
    // has entries in SyncAppendEntries msg
    bool hasAppendEntries = pMsg->dataLen > 0;
M
Minghao Li 已提交
168

M
Minghao Li 已提交
169 170 171 172
    sTrace(
        "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
        "logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
M
Minghao Li 已提交
173

M
Minghao Li 已提交
174 175 176
    if (hasExtraEntries && hasAppendEntries) {
      // not conflict by default
      bool conflict = false;
M
Minghao Li 已提交
177

M
Minghao Li 已提交
178
      SyncIndex       extraIndex = pMsg->prevLogIndex + 1;
179
      SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
M
Minghao Li 已提交
180
      assert(pExtraEntry != NULL);
M
Minghao Li 已提交
181

M
Minghao Li 已提交
182 183
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
      assert(pAppendEntry != NULL);
M
Minghao Li 已提交
184

M
Minghao Li 已提交
185 186 187 188 189
      // log not match, conflict
      assert(extraIndex == pAppendEntry->index);
      if (pExtraEntry->term != pAppendEntry->term) {
        conflict = true;
      }
M
Minghao Li 已提交
190

M
Minghao Li 已提交
191 192 193 194
      if (conflict) {
        // roll back
        SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
        SyncIndex delEnd = extraIndex;
M
Minghao Li 已提交
195

M
Minghao Li 已提交
196
        sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
M
Minghao Li 已提交
197

M
Minghao Li 已提交
198 199 200
        // notice! reverse roll back!
        for (SyncIndex index = delEnd; index >= delBegin; --index) {
          if (ths->pFsm->FpRollBackCb != NULL) {
201
            SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
M
Minghao Li 已提交
202
            assert(pRollBackEntry != NULL);
M
Minghao Li 已提交
203

M
Minghao Li 已提交
204
            // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
205
            if (syncUtilUserRollback(pRollBackEntry->msgType)) {
M
Minghao Li 已提交
206 207 208 209 210 211 212 213 214 215 216 217
              SRpcMsg rpcMsg;
              syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);

              SFsmCbMeta cbMeta;
              cbMeta.index = pRollBackEntry->index;
              cbMeta.isWeak = pRollBackEntry->isWeak;
              cbMeta.code = 0;
              cbMeta.state = ths->state;
              cbMeta.seqNum = pRollBackEntry->seqNum;
              ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
              rpcFreeCont(rpcMsg.pCont);
            }
M
Minghao Li 已提交
218

M
Minghao Li 已提交
219
            syncEntryDestory(pRollBackEntry);
M
Minghao Li 已提交
220 221 222
          }
        }

M
Minghao Li 已提交
223 224
        // delete confict entries
        ths->pLogStore->truncate(ths->pLogStore, extraIndex);
M
Minghao Li 已提交
225 226 227 228 229 230 231 232

        // append new entries
        ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);

        // pre commit
        SRpcMsg rpcMsg;
        syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
        if (ths->pFsm != NULL) {
M
Minghao Li 已提交
233
          // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
234
          if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
M
Minghao Li 已提交
235 236 237 238 239 240 241
            SFsmCbMeta cbMeta;
            cbMeta.index = pAppendEntry->index;
            cbMeta.isWeak = pAppendEntry->isWeak;
            cbMeta.code = 2;
            cbMeta.state = ths->state;
            cbMeta.seqNum = pAppendEntry->seqNum;
            ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
242 243 244
          }
        }
        rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
245
      }
M
Minghao Li 已提交
246

M
Minghao Li 已提交
247 248 249
      // free memory
      syncEntryDestory(pExtraEntry);
      syncEntryDestory(pAppendEntry);
M
Minghao Li 已提交
250

M
Minghao Li 已提交
251 252
    } else if (hasExtraEntries && !hasAppendEntries) {
      // do nothing
M
Minghao Li 已提交
253

M
Minghao Li 已提交
254 255 256
    } else if (!hasExtraEntries && hasAppendEntries) {
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
      assert(pAppendEntry != NULL);
M
Minghao Li 已提交
257

M
Minghao Li 已提交
258 259
      // append new entries
      ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
M
Minghao Li 已提交
260

M
Minghao Li 已提交
261 262 263 264
      // pre commit
      SRpcMsg rpcMsg;
      syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
      if (ths->pFsm != NULL) {
M
Minghao Li 已提交
265
        // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
266
        if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
M
Minghao Li 已提交
267 268 269 270 271 272 273
          SFsmCbMeta cbMeta;
          cbMeta.index = pAppendEntry->index;
          cbMeta.isWeak = pAppendEntry->isWeak;
          cbMeta.code = 3;
          cbMeta.state = ths->state;
          cbMeta.seqNum = pAppendEntry->seqNum;
          ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
274
        }
M
Minghao Li 已提交
275
      }
M
Minghao Li 已提交
276
      rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
277

M
Minghao Li 已提交
278 279 280 281 282
      // free memory
      syncEntryDestory(pAppendEntry);

    } else if (!hasExtraEntries && !hasAppendEntries) {
      // do nothing
M
Minghao Li 已提交
283

M
Minghao Li 已提交
284 285
    } else {
      assert(0);
M
Minghao Li 已提交
286
    }
M
Minghao Li 已提交
287

M
Minghao Li 已提交
288
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
M
Minghao Li 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = true;

    if (hasAppendEntries) {
      pReply->matchIndex = pMsg->prevLogIndex + 1;
    } else {
      pReply->matchIndex = pMsg->prevLogIndex;
    }

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);
M
Minghao Li 已提交
304

M
Minghao Li 已提交
305
    // maybe update commit index from leader
M
Minghao Li 已提交
306
    if (pMsg->commitIndex > ths->commitIndex) {
M
Minghao Li 已提交
307
      // has commit entry in local
M
Minghao Li 已提交
308
      if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
M
Minghao Li 已提交
309 310 311 312
        SyncIndex beginIndex = ths->commitIndex + 1;
        SyncIndex endIndex = pMsg->commitIndex;

        // update commit index
M
Minghao Li 已提交
313
        ths->commitIndex = pMsg->commitIndex;
M
Minghao Li 已提交
314 315

        // call back Wal
M
Minghao Li 已提交
316
        ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
M
Minghao Li 已提交
317 318 319 320 321 322 323 324 325 326 327

        // execute fsm
        if (ths->pFsm != NULL) {
          for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
            if (i != SYNC_INDEX_INVALID) {
              SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
              assert(pEntry != NULL);

              SRpcMsg rpcMsg;
              syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
328
              if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
329 330 331 332 333 334
                SFsmCbMeta cbMeta;
                cbMeta.index = pEntry->index;
                cbMeta.isWeak = pEntry->isWeak;
                cbMeta.code = 0;
                cbMeta.state = ths->state;
                cbMeta.seqNum = pEntry->seqNum;
335 336
                cbMeta.term = pEntry->term;
                cbMeta.currentTerm = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
337
                cbMeta.flag = 0x11;
338

339 340 341 342
                SSnapshot snapshot;
                ASSERT(ths->pFsm->FpGetSnapshot != NULL);
                ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);

343
                bool needExecute = true;
344
                if (cbMeta.index <= snapshot.lastApplyIndex) {
345 346 347 348 349 350
                  needExecute = false;
                }

                if (needExecute) {
                  ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
                }
M
Minghao Li 已提交
351 352
              }

M
Minghao Li 已提交
353 354
              // config change
              if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
M
Minghao Li 已提交
355 356
                SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;

M
Minghao Li 已提交
357
                SSyncCfg newSyncCfg;
M
Minghao Li 已提交
358
                int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
M
Minghao Li 已提交
359 360
                ASSERT(ret == 0);

M
Minghao Li 已提交
361 362 363 364 365 366 367 368 369
                // update new config myIndex
                bool hit = false;
                for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
                  if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
                      ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
                    newSyncCfg.myIndex = i;
                    hit = true;
                    break;
                  }
M
Minghao Li 已提交
370
                }
M
Minghao Li 已提交
371

M
Minghao Li 已提交
372
                SReConfigCbMeta cbMeta = {0};
M
Minghao Li 已提交
373
                bool            isDrop;
M
Minghao Li 已提交
374 375 376 377 378 379 380 381 382 383 384 385

                // I am in newConfig
                if (hit) {
                  syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);

                  // change isStandBy to normal
                  if (!isDrop) {
                    if (ths->state == TAOS_SYNC_STATE_LEADER) {
                      syncNodeBecomeLeader(ths);
                    } else {
                      syncNodeBecomeFollower(ths);
                    }
M
Minghao Li 已提交
386 387
                  }

M
Minghao Li 已提交
388 389 390 391 392 393
                  char* sOld = syncCfg2Str(&oldSyncCfg);
                  char* sNew = syncCfg2Str(&newSyncCfg);
                  sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
                  taosMemoryFree(sOld);
                  taosMemoryFree(sNew);
                }
M
Minghao Li 已提交
394

M
Minghao Li 已提交
395
                // always call FpReConfigCb
M
Minghao Li 已提交
396
                if (ths->pFsm->FpReConfigCb != NULL) {
M
Minghao Li 已提交
397 398 399 400
                  cbMeta.code = 0;
                  cbMeta.currentTerm = ths->pRaftStore->currentTerm;
                  cbMeta.index = pEntry->index;
                  cbMeta.term = pEntry->term;
M
Minghao Li 已提交
401 402 403
                  cbMeta.oldCfg = oldSyncCfg;
                  cbMeta.flag = 0x11;
                  cbMeta.isDrop = isDrop;
M
Minghao Li 已提交
404 405
                  ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
                }
M
Minghao Li 已提交
406 407
              }

408 409 410
              // restore finish
              if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
                if (ths->restoreFinish == false) {
411 412
                  if (ths->pFsm->FpRestoreFinishCb != NULL) {
                    ths->pFsm->FpRestoreFinishCb(ths->pFsm);
413
                  }
414
                  ths->restoreFinish = true;
415
                  sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
416

417
                  /*
418
                  tsem_post(&ths->restoreSem);
419 420
                  sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths);
                  */
421 422 423
                }
              }

M
Minghao Li 已提交
424 425 426 427 428
              rpcFreeCont(rpcMsg.pCont);
              syncEntryDestory(pEntry);
            }
          }
        }
M
Minghao Li 已提交
429 430 431 432
      }
    }
  }

M
Minghao Li 已提交
433 434
  return ret;
}
M
Minghao Li 已提交
435

436 437 438 439 440 441
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
  if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
    return true;
  }

  SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
M
Minghao Li 已提交
442 443 444
  if (pMsg->prevLogIndex > myLastIndex) {
    return false;
  }
445

M
Minghao Li 已提交
446
  SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
  if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
    return true;
  }

  return false;
}

static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyncRaftEntry** ppAppendEntry) {
  int32_t code;
  *ppAppendEntry = NULL;

  // not conflict by default
  bool conflict = false;

  SyncIndex       extraIndex = pMsg->prevLogIndex + 1;
  SSyncRaftEntry* pExtraEntry;
  code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, extraIndex, &pExtraEntry);
  ASSERT(pExtraEntry != NULL);

  *ppAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
  ASSERT(*ppAppendEntry != NULL);

  // log not match, conflict, need delete
  ASSERT(extraIndex == (*ppAppendEntry)->index);
  if (pExtraEntry->term != (*ppAppendEntry)->term) {
    conflict = true;
  }
  syncEntryDestory(pExtraEntry);

  if (conflict) {
    // roll back
    SyncIndex delBegin = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
    SyncIndex delEnd = extraIndex;

    sTrace("entry conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);

    // notice! reverse roll back!
    for (SyncIndex index = delEnd; index >= delBegin; --index) {
      if (ths->pFsm->FpRollBackCb != NULL) {
        SSyncRaftEntry* pRollBackEntry;
        code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
        ASSERT(code == 0);
        ASSERT(pRollBackEntry != NULL);

        if (syncUtilUserRollback(pRollBackEntry->msgType)) {
          SRpcMsg rpcMsg;
          syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);

          SFsmCbMeta cbMeta;
          cbMeta.index = pRollBackEntry->index;
          cbMeta.isWeak = pRollBackEntry->isWeak;
          cbMeta.code = 0;
          cbMeta.state = ths->state;
          cbMeta.seqNum = pRollBackEntry->seqNum;
          ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
          rpcFreeCont(rpcMsg.pCont);
        }

        syncEntryDestory(pRollBackEntry);
      }
    }

    // delete confict entries
    code = ths->pLogStore->syncLogTruncate(ths->pLogStore, extraIndex);
    ASSERT(code == 0);
  }

  return 0;
}

static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
  SRpcMsg rpcMsg;
  syncEntry2OriginalRpc(pEntry, &rpcMsg);
  if (ths->pFsm != NULL) {
    if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
      SFsmCbMeta cbMeta;
      cbMeta.index = pEntry->index;
      cbMeta.isWeak = pEntry->isWeak;
      cbMeta.code = 2;
      cbMeta.state = ths->state;
      cbMeta.seqNum = pEntry->seqNum;
      ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
    }
  }
  rpcFreeCont(rpcMsg.pCont);
  return 0;
}

535 536
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
  int32_t ret = 0;
537
  int32_t code = 0;
538

539
  // print log
540
  char logBuf[128] = {0};
541
  snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm);
542 543
  syncAppendEntriesLog2(logBuf, pMsg);

544
  // maybe update term
545 546 547
  if (pMsg->term > ths->pRaftStore->currentTerm) {
    syncNodeUpdateTerm(ths, pMsg->term);
  }
548
  ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
549 550 551 552 553 554

  // reset elect timer
  if (pMsg->term == ths->pRaftStore->currentTerm) {
    ths->leaderCache = pMsg->srcId;
    syncNodeResetElectTimer(ths);
  }
555
  ASSERT(pMsg->dataLen >= 0);
M
Minghao Li 已提交
556

557
  bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
558

559
  // case1, reject request
560 561
  if ((pMsg->term < ths->pRaftStore->currentTerm) ||
      ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
562 563
    sTrace("recv SyncAppendEntries, reject, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d", pMsg->term,
           ths->pRaftStore->currentTerm, ths->state, logOK);
564

565
    // send response
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = false;
    pReply->matchIndex = SYNC_INDEX_INVALID;

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);

    return ret;
  }

581
  // case 2, return to follower state
582
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
583 584
    sTrace("recv SyncAppendEntries, return to follower, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d",
           pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
585 586 587 588 589 590 591

    syncNodeBecomeFollower(ths);

    // ret or reply?
    return ret;
  }

592
  // case 3, accept request
593 594
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
    // has extra entries (> preIndex) in local log
595 596
    SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
    bool      hasExtraEntries = myLastIndex > pMsg->prevLogIndex;
597 598 599 600 601

    // has entries in SyncAppendEntries msg
    bool hasAppendEntries = pMsg->dataLen > 0;

    sTrace(
602 603
        "recv SyncAppendEntries, accept, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d, "
        "hasExtraEntries:%d, hasAppendEntries:%d",
604 605 606
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);

    if (hasExtraEntries && hasAppendEntries) {
607 608 609 610 611
      // make log same
      SSyncRaftEntry* pAppendEntry;
      code = syncNodeMakeLogSame(ths, pMsg, &pAppendEntry);
      ASSERT(code == 0);
      ASSERT(pAppendEntry != NULL);
612

613 614 615
      // append new entries
      code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
      ASSERT(code == 0);
616

617 618 619
      // pre commit
      code = syncNodePreCommit(ths, pAppendEntry);
      ASSERT(code == 0);
620 621 622 623 624 625 626 627

      syncEntryDestory(pAppendEntry);

    } else if (hasExtraEntries && !hasAppendEntries) {
      // do nothing

    } else if (!hasExtraEntries && hasAppendEntries) {
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
628
      ASSERT(pAppendEntry != NULL);
629 630

      // append new entries
631 632
      code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
      ASSERT(code == 0);
633 634

      // pre commit
635 636
      code = syncNodePreCommit(ths, pAppendEntry);
      ASSERT(code == 0);
637 638 639 640 641 642 643

      syncEntryDestory(pAppendEntry);

    } else if (!hasExtraEntries && !hasAppendEntries) {
      // do nothing

    } else {
644
      ASSERT(0);
645 646
    }

647
    // prepare response msg
648 649 650 651 652 653 654 655 656 657 658 659
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = true;

    if (hasAppendEntries) {
      pReply->matchIndex = pMsg->prevLogIndex + 1;
    } else {
      pReply->matchIndex = pMsg->prevLogIndex;
    }

660
    // send response
661 662 663 664 665 666 667 668
    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);

    // maybe update commit index from leader
    if (pMsg->commitIndex > ths->commitIndex) {
      // has commit entry in local
669
      if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
670 671 672 673 674 675 676
        SyncIndex beginIndex = ths->commitIndex + 1;
        SyncIndex endIndex = pMsg->commitIndex;

        // update commit index
        ths->commitIndex = pMsg->commitIndex;

        // call back Wal
677 678
        code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
        ASSERT(code == 0);
679

680 681
        code = syncNodeCommit(ths, beginIndex, endIndex, 0x11);
        ASSERT(code == 0);
682 683 684 685 686 687
      }
    }
  }

  return ret;
}