You need to sign in or sign up before continuing.
syncAppendEntries.c 25.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncAppendEntries.h"
M
Minghao Li 已提交
17
#include "syncInt.h"
M
Minghao Li 已提交
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19 20 21 22
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
M
Minghao Li 已提交
23
#include "wal.h"
M
Minghao Li 已提交
24

M
Minghao Li 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
//    LET logOk == \/ m.mprevLogIndex = 0
//                 \/ /\ m.mprevLogIndex > 0
//                    /\ m.mprevLogIndex <= Len(log[i])
//                    /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
//    IN /\ m.mterm <= currentTerm[i]
//       /\ \/ /\ \* reject request
//                \/ m.mterm < currentTerm[i]
//                \/ /\ m.mterm = currentTerm[i]
//                   /\ state[i] = Follower
//                   /\ \lnot logOk
//             /\ Reply([mtype           |-> AppendEntriesResponse,
//                       mterm           |-> currentTerm[i],
//                       msuccess        |-> FALSE,
//                       mmatchIndex     |-> 0,
//                       msource         |-> i,
//                       mdest           |-> j],
//                       m)
//             /\ UNCHANGED <<serverVars, logVars>>
//          \/ \* return to follower state
//             /\ m.mterm = currentTerm[i]
//             /\ state[i] = Candidate
//             /\ state' = [state EXCEPT ![i] = Follower]
//             /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
//          \/ \* accept request
//             /\ m.mterm = currentTerm[i]
//             /\ state[i] = Follower
//             /\ logOk
//             /\ LET index == m.mprevLogIndex + 1
//                IN \/ \* already done with request
//                       /\ \/ m.mentries = << >>
//                          \/ /\ m.mentries /= << >>
//                             /\ Len(log[i]) >= index
//                             /\ log[i][index].term = m.mentries[1].term
//                          \* This could make our commitIndex decrease (for
//                          \* example if we process an old, duplicated request),
//                          \* but that doesn't really affect anything.
//                       /\ commitIndex' = [commitIndex EXCEPT ![i] =
//                                              m.mcommitIndex]
//                       /\ Reply([mtype           |-> AppendEntriesResponse,
//                                 mterm           |-> currentTerm[i],
//                                 msuccess        |-> TRUE,
//                                 mmatchIndex     |-> m.mprevLogIndex +
//                                                     Len(m.mentries),
//                                 msource         |-> i,
//                                 mdest           |-> j],
//                                 m)
//                       /\ UNCHANGED <<serverVars, log>>
//                   \/ \* conflict: remove 1 entry
//                       /\ m.mentries /= << >>
//                       /\ Len(log[i]) >= index
//                       /\ log[i][index].term /= m.mentries[1].term
//                       /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
//                                          log[i][index2]]
//                          IN log' = [log EXCEPT ![i] = new]
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
//                   \/ \* no conflict: append entry
//                       /\ m.mentries /= << >>
//                       /\ Len(log[i]) = m.mprevLogIndex
//                       /\ log' = [log EXCEPT ![i] =
//                                      Append(log[i], m.mentries[1])]
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
//       /\ UNCHANGED <<candidateVars, leaderVars>>
//
M
Minghao Li 已提交
90 91
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
92

93
  char logBuf[128] = {0};
M
Minghao Li 已提交
94 95
  snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
  syncAppendEntriesLog2(logBuf, pMsg);
M
Minghao Li 已提交
96

M
Minghao Li 已提交
97 98 99 100 101
  if (pMsg->term > ths->pRaftStore->currentTerm) {
    syncNodeUpdateTerm(ths, pMsg->term);
  }
  assert(pMsg->term <= ths->pRaftStore->currentTerm);

M
Minghao Li 已提交
102
  // reset elect timer
M
Minghao Li 已提交
103 104 105 106 107 108 109
  if (pMsg->term == ths->pRaftStore->currentTerm) {
    ths->leaderCache = pMsg->srcId;
    syncNodeResetElectTimer(ths);
  }
  assert(pMsg->dataLen >= 0);

  SyncTerm localPreLogTerm = 0;
M
Minghao Li 已提交
110
  if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
111
    SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
M
Minghao Li 已提交
112 113 114 115 116 117 118 119
    assert(pEntry != NULL);
    localPreLogTerm = pEntry->term;
    syncEntryDestory(pEntry);
  }

  bool logOK =
      (pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
      ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
M
Minghao Li 已提交
120
       (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
M
Minghao Li 已提交
121

M
Minghao Li 已提交
122
  // reject request
M
Minghao Li 已提交
123 124
  if ((pMsg->term < ths->pRaftStore->currentTerm) ||
      ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
M
Minghao Li 已提交
125 126 127 128 129
    sTrace(
        "syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
        "logOK:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);

M
Minghao Li 已提交
130
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
M
Minghao Li 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = false;
    pReply->matchIndex = SYNC_INDEX_INVALID;

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);

    return ret;
  }

  // return to follower state
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
M
Minghao Li 已提交
147 148 149 150 151
    sTrace(
        "syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
        "ths->state:%d, logOK:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);

M
Minghao Li 已提交
152
    syncNodeBecomeFollower(ths);
M
Minghao Li 已提交
153

M
Minghao Li 已提交
154
    // ret or reply?
M
Minghao Li 已提交
155
    return ret;
M
Minghao Li 已提交
156 157 158 159
  }

  // accept request
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
M
Minghao Li 已提交
160 161
    // preIndex = -1, or has preIndex entry in local log
    assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
M
Minghao Li 已提交
162

M
Minghao Li 已提交
163 164
    // has extra entries (> preIndex) in local log
    bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
M
Minghao Li 已提交
165

M
Minghao Li 已提交
166 167
    // has entries in SyncAppendEntries msg
    bool hasAppendEntries = pMsg->dataLen > 0;
M
Minghao Li 已提交
168

M
Minghao Li 已提交
169 170 171 172
    sTrace(
        "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
        "logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
M
Minghao Li 已提交
173

M
Minghao Li 已提交
174 175 176
    if (hasExtraEntries && hasAppendEntries) {
      // not conflict by default
      bool conflict = false;
M
Minghao Li 已提交
177

M
Minghao Li 已提交
178
      SyncIndex       extraIndex = pMsg->prevLogIndex + 1;
179
      SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
M
Minghao Li 已提交
180
      assert(pExtraEntry != NULL);
M
Minghao Li 已提交
181

M
Minghao Li 已提交
182 183
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
      assert(pAppendEntry != NULL);
M
Minghao Li 已提交
184

M
Minghao Li 已提交
185 186 187 188 189
      // log not match, conflict
      assert(extraIndex == pAppendEntry->index);
      if (pExtraEntry->term != pAppendEntry->term) {
        conflict = true;
      }
M
Minghao Li 已提交
190

M
Minghao Li 已提交
191 192 193 194
      if (conflict) {
        // roll back
        SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
        SyncIndex delEnd = extraIndex;
M
Minghao Li 已提交
195

M
Minghao Li 已提交
196
        sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
M
Minghao Li 已提交
197

M
Minghao Li 已提交
198 199 200
        // notice! reverse roll back!
        for (SyncIndex index = delEnd; index >= delBegin; --index) {
          if (ths->pFsm->FpRollBackCb != NULL) {
201
            SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
M
Minghao Li 已提交
202
            assert(pRollBackEntry != NULL);
M
Minghao Li 已提交
203

204
            // if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
M
Minghao Li 已提交
205
            if (syncUtilUserRollback(pRollBackEntry->msgType)) {
M
Minghao Li 已提交
206 207 208 209 210 211 212 213 214 215 216 217
              SRpcMsg rpcMsg;
              syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);

              SFsmCbMeta cbMeta;
              cbMeta.index = pRollBackEntry->index;
              cbMeta.isWeak = pRollBackEntry->isWeak;
              cbMeta.code = 0;
              cbMeta.state = ths->state;
              cbMeta.seqNum = pRollBackEntry->seqNum;
              ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
              rpcFreeCont(rpcMsg.pCont);
            }
M
Minghao Li 已提交
218

M
Minghao Li 已提交
219
            syncEntryDestory(pRollBackEntry);
M
Minghao Li 已提交
220 221 222
          }
        }

M
Minghao Li 已提交
223 224
        // delete confict entries
        ths->pLogStore->truncate(ths->pLogStore, extraIndex);
M
Minghao Li 已提交
225 226 227 228 229 230 231 232

        // append new entries
        ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);

        // pre commit
        SRpcMsg rpcMsg;
        syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
        if (ths->pFsm != NULL) {
233
          // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
M
Minghao Li 已提交
234
          if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
M
Minghao Li 已提交
235 236 237 238 239 240 241
            SFsmCbMeta cbMeta;
            cbMeta.index = pAppendEntry->index;
            cbMeta.isWeak = pAppendEntry->isWeak;
            cbMeta.code = 2;
            cbMeta.state = ths->state;
            cbMeta.seqNum = pAppendEntry->seqNum;
            ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
242 243 244
          }
        }
        rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
245
      }
M
Minghao Li 已提交
246

M
Minghao Li 已提交
247 248 249
      // free memory
      syncEntryDestory(pExtraEntry);
      syncEntryDestory(pAppendEntry);
M
Minghao Li 已提交
250

M
Minghao Li 已提交
251 252
    } else if (hasExtraEntries && !hasAppendEntries) {
      // do nothing
M
Minghao Li 已提交
253

M
Minghao Li 已提交
254 255 256
    } else if (!hasExtraEntries && hasAppendEntries) {
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
      assert(pAppendEntry != NULL);
M
Minghao Li 已提交
257

M
Minghao Li 已提交
258 259
      // append new entries
      ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
M
Minghao Li 已提交
260

M
Minghao Li 已提交
261 262 263 264
      // pre commit
      SRpcMsg rpcMsg;
      syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
      if (ths->pFsm != NULL) {
265
        // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
M
Minghao Li 已提交
266
        if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
M
Minghao Li 已提交
267 268 269 270 271 272 273
          SFsmCbMeta cbMeta;
          cbMeta.index = pAppendEntry->index;
          cbMeta.isWeak = pAppendEntry->isWeak;
          cbMeta.code = 3;
          cbMeta.state = ths->state;
          cbMeta.seqNum = pAppendEntry->seqNum;
          ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
274
        }
M
Minghao Li 已提交
275
      }
M
Minghao Li 已提交
276
      rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
277

M
Minghao Li 已提交
278 279 280 281 282
      // free memory
      syncEntryDestory(pAppendEntry);

    } else if (!hasExtraEntries && !hasAppendEntries) {
      // do nothing
M
Minghao Li 已提交
283

M
Minghao Li 已提交
284 285
    } else {
      assert(0);
M
Minghao Li 已提交
286
    }
M
Minghao Li 已提交
287

M
Minghao Li 已提交
288
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
M
Minghao Li 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = true;

    if (hasAppendEntries) {
      pReply->matchIndex = pMsg->prevLogIndex + 1;
    } else {
      pReply->matchIndex = pMsg->prevLogIndex;
    }

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);
M
Minghao Li 已提交
304

M
Minghao Li 已提交
305
    // maybe update commit index from leader
M
Minghao Li 已提交
306
    if (pMsg->commitIndex > ths->commitIndex) {
M
Minghao Li 已提交
307
      // has commit entry in local
M
Minghao Li 已提交
308
      if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
M
Minghao Li 已提交
309 310 311 312
        SyncIndex beginIndex = ths->commitIndex + 1;
        SyncIndex endIndex = pMsg->commitIndex;

        // update commit index
M
Minghao Li 已提交
313
        ths->commitIndex = pMsg->commitIndex;
M
Minghao Li 已提交
314 315

        // call back Wal
M
Minghao Li 已提交
316
        ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
M
Minghao Li 已提交
317 318 319 320 321 322 323 324 325 326 327

        // execute fsm
        if (ths->pFsm != NULL) {
          for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
            if (i != SYNC_INDEX_INVALID) {
              SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
              assert(pEntry != NULL);

              SRpcMsg rpcMsg;
              syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
328
              if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
329 330 331 332 333 334
                SFsmCbMeta cbMeta;
                cbMeta.index = pEntry->index;
                cbMeta.isWeak = pEntry->isWeak;
                cbMeta.code = 0;
                cbMeta.state = ths->state;
                cbMeta.seqNum = pEntry->seqNum;
335 336
                cbMeta.term = pEntry->term;
                cbMeta.currentTerm = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
337
                cbMeta.flag = 0x11;
338

339 340 341 342
                SSnapshot snapshot;
                ASSERT(ths->pFsm->FpGetSnapshot != NULL);
                ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);

343
                bool needExecute = true;
344
                if (cbMeta.index <= snapshot.lastApplyIndex) {
345 346 347 348 349 350
                  needExecute = false;
                }

                if (needExecute) {
                  ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
                }
M
Minghao Li 已提交
351 352
              }

M
Minghao Li 已提交
353
              // config change
354
              if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
M
Minghao Li 已提交
355 356
                SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;

M
Minghao Li 已提交
357
                SSyncCfg newSyncCfg;
M
Minghao Li 已提交
358
                int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
M
Minghao Li 已提交
359 360
                ASSERT(ret == 0);

M
Minghao Li 已提交
361 362 363 364 365 366 367 368 369
                // update new config myIndex
                bool hit = false;
                for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
                  if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
                      ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
                    newSyncCfg.myIndex = i;
                    hit = true;
                    break;
                  }
M
Minghao Li 已提交
370
                }
M
Minghao Li 已提交
371

M
Minghao Li 已提交
372
                SReConfigCbMeta cbMeta = {0};
M
Minghao Li 已提交
373
                bool            isDrop;
M
Minghao Li 已提交
374 375 376 377 378 379 380 381 382 383 384 385

                // I am in newConfig
                if (hit) {
                  syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);

                  // change isStandBy to normal
                  if (!isDrop) {
                    if (ths->state == TAOS_SYNC_STATE_LEADER) {
                      syncNodeBecomeLeader(ths);
                    } else {
                      syncNodeBecomeFollower(ths);
                    }
M
Minghao Li 已提交
386 387
                  }

M
Minghao Li 已提交
388 389 390 391 392 393
                  char* sOld = syncCfg2Str(&oldSyncCfg);
                  char* sNew = syncCfg2Str(&newSyncCfg);
                  sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
                  taosMemoryFree(sOld);
                  taosMemoryFree(sNew);
                }
M
Minghao Li 已提交
394

M
Minghao Li 已提交
395
                // always call FpReConfigCb
M
Minghao Li 已提交
396
                if (ths->pFsm->FpReConfigCb != NULL) {
M
Minghao Li 已提交
397 398 399 400
                  cbMeta.code = 0;
                  cbMeta.currentTerm = ths->pRaftStore->currentTerm;
                  cbMeta.index = pEntry->index;
                  cbMeta.term = pEntry->term;
M
Minghao Li 已提交
401 402 403
                  cbMeta.oldCfg = oldSyncCfg;
                  cbMeta.flag = 0x11;
                  cbMeta.isDrop = isDrop;
M
Minghao Li 已提交
404 405
                  ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
                }
M
Minghao Li 已提交
406 407
              }

408 409 410
              // restore finish
              if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
                if (ths->restoreFinish == false) {
411 412
                  if (ths->pFsm->FpRestoreFinishCb != NULL) {
                    ths->pFsm->FpRestoreFinishCb(ths->pFsm);
413
                  }
414
                  ths->restoreFinish = true;
415
                  sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
416

417
                  /*
418
                  tsem_post(&ths->restoreSem);
419 420
                  sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths);
                  */
421 422 423
                }
              }

M
Minghao Li 已提交
424 425 426 427 428
              rpcFreeCont(rpcMsg.pCont);
              syncEntryDestory(pEntry);
            }
          }
        }
M
Minghao Li 已提交
429 430 431 432
      }
    }
  }

M
Minghao Li 已提交
433 434
  return ret;
}
M
Minghao Li 已提交
435

436 437 438 439 440 441
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
  if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
    return true;
  }

  SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
M
Minghao Li 已提交
442 443 444
  if (pMsg->prevLogIndex > myLastIndex) {
    return false;
  }
445

M
Minghao Li 已提交
446
  SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
447 448 449 450 451 452 453
  if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
    return true;
  }

  return false;
}

M
Minghao Li 已提交
454 455
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyncRaftEntry** ppAppendEntry,
                                   bool* pEntryAlreadyWritten) {
456 457
  int32_t code;
  *ppAppendEntry = NULL;
M
Minghao Li 已提交
458
  *pEntryAlreadyWritten = false;
459 460 461 462 463 464 465 466 467 468 469 470 471 472

  // not conflict by default
  bool conflict = false;

  SyncIndex       extraIndex = pMsg->prevLogIndex + 1;
  SSyncRaftEntry* pExtraEntry;
  code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, extraIndex, &pExtraEntry);
  ASSERT(pExtraEntry != NULL);

  *ppAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
  ASSERT(*ppAppendEntry != NULL);

  ASSERT(extraIndex == (*ppAppendEntry)->index);
  if (pExtraEntry->term != (*ppAppendEntry)->term) {
M
Minghao Li 已提交
473
    // log not match, conflict, need delete
474
    conflict = true;
M
Minghao Li 已提交
475 476 477 478 479
  } else {
    // log match, already written
    ASSERT(extraIndex == (*ppAppendEntry)->index && pExtraEntry->term == (*ppAppendEntry)->term);
    *pEntryAlreadyWritten = true;
    sInfo("entry already written, term:%lu, index:%ld", pExtraEntry->term, pExtraEntry->index);
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
  }
  syncEntryDestory(pExtraEntry);

  if (conflict) {
    // roll back
    SyncIndex delBegin = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
    SyncIndex delEnd = extraIndex;

    sTrace("entry conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);

    // notice! reverse roll back!
    for (SyncIndex index = delEnd; index >= delBegin; --index) {
      if (ths->pFsm->FpRollBackCb != NULL) {
        SSyncRaftEntry* pRollBackEntry;
        code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
        ASSERT(code == 0);
        ASSERT(pRollBackEntry != NULL);

        if (syncUtilUserRollback(pRollBackEntry->msgType)) {
          SRpcMsg rpcMsg;
          syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);

          SFsmCbMeta cbMeta;
          cbMeta.index = pRollBackEntry->index;
          cbMeta.isWeak = pRollBackEntry->isWeak;
          cbMeta.code = 0;
          cbMeta.state = ths->state;
          cbMeta.seqNum = pRollBackEntry->seqNum;
          ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
          rpcFreeCont(rpcMsg.pCont);
        }

        syncEntryDestory(pRollBackEntry);
      }
    }

    // delete confict entries
    code = ths->pLogStore->syncLogTruncate(ths->pLogStore, extraIndex);
    ASSERT(code == 0);
  }

  return 0;
}

static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
  SRpcMsg rpcMsg;
  syncEntry2OriginalRpc(pEntry, &rpcMsg);
  if (ths->pFsm != NULL) {
    if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
      SFsmCbMeta cbMeta;
      cbMeta.index = pEntry->index;
      cbMeta.isWeak = pEntry->isWeak;
      cbMeta.code = 2;
      cbMeta.state = ths->state;
      cbMeta.seqNum = pEntry->seqNum;
      ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
    }
  }
  rpcFreeCont(rpcMsg.pCont);
  return 0;
}

542 543
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
  int32_t ret = 0;
544
  int32_t code = 0;
545

546
  // print log
547
  char logBuf[128] = {0};
548
  snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm);
549 550
  syncAppendEntriesLog2(logBuf, pMsg);

551
  // maybe update term
552 553 554
  if (pMsg->term > ths->pRaftStore->currentTerm) {
    syncNodeUpdateTerm(ths, pMsg->term);
  }
555
  ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
556 557 558 559 560 561

  // reset elect timer
  if (pMsg->term == ths->pRaftStore->currentTerm) {
    ths->leaderCache = pMsg->srcId;
    syncNodeResetElectTimer(ths);
  }
562
  ASSERT(pMsg->dataLen >= 0);
M
Minghao Li 已提交
563

564
  bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
565

566
  // case1, reject request
567 568
  if ((pMsg->term < ths->pRaftStore->currentTerm) ||
      ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
569 570
    sTrace("recv SyncAppendEntries, reject, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d", pMsg->term,
           ths->pRaftStore->currentTerm, ths->state, logOK);
571

572
    // send response
573 574 575 576 577 578
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = false;
    pReply->matchIndex = SYNC_INDEX_INVALID;
579
    pReply->privateTerm = pMsg->privateTerm;
580 581 582 583 584 585 586 587 588

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);

    return ret;
  }

589
  // case 2, return to follower state
590
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
591 592
    sTrace("recv SyncAppendEntries, return to follower, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d",
           pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
593 594 595 596 597 598 599

    syncNodeBecomeFollower(ths);

    // ret or reply?
    return ret;
  }

600
  // case 3, accept request
601 602
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
    // has extra entries (> preIndex) in local log
603 604
    SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
    bool      hasExtraEntries = myLastIndex > pMsg->prevLogIndex;
605 606 607 608 609

    // has entries in SyncAppendEntries msg
    bool hasAppendEntries = pMsg->dataLen > 0;

    sTrace(
610 611
        "recv SyncAppendEntries, accept, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d, "
        "hasExtraEntries:%d, hasAppendEntries:%d",
612 613 614
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);

    if (hasExtraEntries && hasAppendEntries) {
615 616
      // make log same
      SSyncRaftEntry* pAppendEntry;
M
Minghao Li 已提交
617 618
      bool            entryAlreadyWritten;
      code = syncNodeMakeLogSame(ths, pMsg, &pAppendEntry, &entryAlreadyWritten);
619 620
      ASSERT(code == 0);
      ASSERT(pAppendEntry != NULL);
621

M
Minghao Li 已提交
622 623 624 625
      if (!entryAlreadyWritten) {
        // append new entries
        code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
        ASSERT(code == 0);
626

M
Minghao Li 已提交
627 628 629 630
        // pre commit
        code = syncNodePreCommit(ths, pAppendEntry);
        ASSERT(code == 0);
      }
631 632 633 634 635 636 637 638

      syncEntryDestory(pAppendEntry);

    } else if (hasExtraEntries && !hasAppendEntries) {
      // do nothing

    } else if (!hasExtraEntries && hasAppendEntries) {
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
639
      ASSERT(pAppendEntry != NULL);
640 641

      // append new entries
642 643
      code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
      ASSERT(code == 0);
644 645

      // pre commit
646 647
      code = syncNodePreCommit(ths, pAppendEntry);
      ASSERT(code == 0);
648 649 650 651 652 653 654

      syncEntryDestory(pAppendEntry);

    } else if (!hasExtraEntries && !hasAppendEntries) {
      // do nothing

    } else {
655
      ASSERT(0);
656 657
    }

658
    // prepare response msg
659 660 661 662 663
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = true;
664
    pReply->privateTerm = pMsg->privateTerm;
665 666 667 668 669 670 671

    if (hasAppendEntries) {
      pReply->matchIndex = pMsg->prevLogIndex + 1;
    } else {
      pReply->matchIndex = pMsg->prevLogIndex;
    }

672
    // send response
673 674 675 676 677 678 679 680
    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);

    // maybe update commit index from leader
    if (pMsg->commitIndex > ths->commitIndex) {
      // has commit entry in local
681
      if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
682 683 684 685 686 687 688
        SyncIndex beginIndex = ths->commitIndex + 1;
        SyncIndex endIndex = pMsg->commitIndex;

        // update commit index
        ths->commitIndex = pMsg->commitIndex;

        // call back Wal
689 690
        code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
        ASSERT(code == 0);
691

692
        code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
693
        ASSERT(code == 0);
694 695 696 697 698 699
      }
    }
  }

  return ret;
}