syncAppendEntries.c 29.9 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncAppendEntries.h"
M
Minghao Li 已提交
17
#include "syncInt.h"
M
Minghao Li 已提交
18
#include "syncRaftCfg.h"
M
Minghao Li 已提交
19 20 21 22
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
M
Minghao Li 已提交
23

M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
//    LET logOk == \/ m.mprevLogIndex = 0
//                 \/ /\ m.mprevLogIndex > 0
//                    /\ m.mprevLogIndex <= Len(log[i])
//                    /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
//    IN /\ m.mterm <= currentTerm[i]
//       /\ \/ /\ \* reject request
//                \/ m.mterm < currentTerm[i]
//                \/ /\ m.mterm = currentTerm[i]
//                   /\ state[i] = Follower
//                   /\ \lnot logOk
//             /\ Reply([mtype           |-> AppendEntriesResponse,
//                       mterm           |-> currentTerm[i],
//                       msuccess        |-> FALSE,
//                       mmatchIndex     |-> 0,
//                       msource         |-> i,
//                       mdest           |-> j],
//                       m)
//             /\ UNCHANGED <<serverVars, logVars>>
//          \/ \* return to follower state
//             /\ m.mterm = currentTerm[i]
//             /\ state[i] = Candidate
//             /\ state' = [state EXCEPT ![i] = Follower]
//             /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
//          \/ \* accept request
//             /\ m.mterm = currentTerm[i]
//             /\ state[i] = Follower
//             /\ logOk
//             /\ LET index == m.mprevLogIndex + 1
//                IN \/ \* already done with request
//                       /\ \/ m.mentries = << >>
//                          \/ /\ m.mentries /= << >>
//                             /\ Len(log[i]) >= index
//                             /\ log[i][index].term = m.mentries[1].term
//                          \* This could make our commitIndex decrease (for
//                          \* example if we process an old, duplicated request),
//                          \* but that doesn't really affect anything.
//                       /\ commitIndex' = [commitIndex EXCEPT ![i] =
//                                              m.mcommitIndex]
//                       /\ Reply([mtype           |-> AppendEntriesResponse,
//                                 mterm           |-> currentTerm[i],
//                                 msuccess        |-> TRUE,
//                                 mmatchIndex     |-> m.mprevLogIndex +
//                                                     Len(m.mentries),
//                                 msource         |-> i,
//                                 mdest           |-> j],
//                                 m)
//                       /\ UNCHANGED <<serverVars, log>>
//                   \/ \* conflict: remove 1 entry
//                       /\ m.mentries /= << >>
//                       /\ Len(log[i]) >= index
//                       /\ log[i][index].term /= m.mentries[1].term
//                       /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
//                                          log[i][index2]]
//                          IN log' = [log EXCEPT ![i] = new]
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
//                   \/ \* no conflict: append entry
//                       /\ m.mentries /= << >>
//                       /\ Len(log[i]) = m.mprevLogIndex
//                       /\ log' = [log EXCEPT ![i] =
//                                      Append(log[i], m.mentries[1])]
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
//       /\ UNCHANGED <<candidateVars, leaderVars>>
//
M
Minghao Li 已提交
89 90
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
91

92
  char logBuf[128] = {0};
M
Minghao Li 已提交
93 94
  snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
  syncAppendEntriesLog2(logBuf, pMsg);
M
Minghao Li 已提交
95

M
Minghao Li 已提交
96 97 98 99 100
  if (pMsg->term > ths->pRaftStore->currentTerm) {
    syncNodeUpdateTerm(ths, pMsg->term);
  }
  assert(pMsg->term <= ths->pRaftStore->currentTerm);

M
Minghao Li 已提交
101
  // reset elect timer
M
Minghao Li 已提交
102 103 104 105 106 107 108
  if (pMsg->term == ths->pRaftStore->currentTerm) {
    ths->leaderCache = pMsg->srcId;
    syncNodeResetElectTimer(ths);
  }
  assert(pMsg->dataLen >= 0);

  SyncTerm localPreLogTerm = 0;
M
Minghao Li 已提交
109
  if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
110
    SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
M
Minghao Li 已提交
111 112 113 114 115 116 117 118
    assert(pEntry != NULL);
    localPreLogTerm = pEntry->term;
    syncEntryDestory(pEntry);
  }

  bool logOK =
      (pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
      ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
M
Minghao Li 已提交
119
       (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
M
Minghao Li 已提交
120

M
Minghao Li 已提交
121
  // reject request
M
Minghao Li 已提交
122 123
  if ((pMsg->term < ths->pRaftStore->currentTerm) ||
      ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
M
Minghao Li 已提交
124 125 126 127 128
    sTrace(
        "syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
        "logOK:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);

M
Minghao Li 已提交
129
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
M
Minghao Li 已提交
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = false;
    pReply->matchIndex = SYNC_INDEX_INVALID;

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);

    return ret;
  }

  // return to follower state
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
M
Minghao Li 已提交
146 147 148 149 150
    sTrace(
        "syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
        "ths->state:%d, logOK:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);

M
Minghao Li 已提交
151
    syncNodeBecomeFollower(ths);
M
Minghao Li 已提交
152

M
Minghao Li 已提交
153
    // ret or reply?
M
Minghao Li 已提交
154
    return ret;
M
Minghao Li 已提交
155 156 157 158
  }

  // accept request
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
M
Minghao Li 已提交
159 160
    // preIndex = -1, or has preIndex entry in local log
    assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
M
Minghao Li 已提交
161

M
Minghao Li 已提交
162 163
    // has extra entries (> preIndex) in local log
    bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
M
Minghao Li 已提交
164

M
Minghao Li 已提交
165 166
    // has entries in SyncAppendEntries msg
    bool hasAppendEntries = pMsg->dataLen > 0;
M
Minghao Li 已提交
167

M
Minghao Li 已提交
168 169 170 171
    sTrace(
        "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
        "logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
M
Minghao Li 已提交
172

M
Minghao Li 已提交
173 174 175
    if (hasExtraEntries && hasAppendEntries) {
      // not conflict by default
      bool conflict = false;
M
Minghao Li 已提交
176

M
Minghao Li 已提交
177
      SyncIndex       extraIndex = pMsg->prevLogIndex + 1;
178
      SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
M
Minghao Li 已提交
179
      assert(pExtraEntry != NULL);
M
Minghao Li 已提交
180

M
Minghao Li 已提交
181 182
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
      assert(pAppendEntry != NULL);
M
Minghao Li 已提交
183

M
Minghao Li 已提交
184 185 186 187 188
      // log not match, conflict
      assert(extraIndex == pAppendEntry->index);
      if (pExtraEntry->term != pAppendEntry->term) {
        conflict = true;
      }
M
Minghao Li 已提交
189

M
Minghao Li 已提交
190 191 192 193
      if (conflict) {
        // roll back
        SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
        SyncIndex delEnd = extraIndex;
M
Minghao Li 已提交
194

M
Minghao Li 已提交
195
        sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
M
Minghao Li 已提交
196

M
Minghao Li 已提交
197 198 199
        // notice! reverse roll back!
        for (SyncIndex index = delEnd; index >= delBegin; --index) {
          if (ths->pFsm->FpRollBackCb != NULL) {
200
            SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
M
Minghao Li 已提交
201
            assert(pRollBackEntry != NULL);
M
Minghao Li 已提交
202

M
Minghao Li 已提交
203
            // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
204
            if (syncUtilUserRollback(pRollBackEntry->msgType)) {
M
Minghao Li 已提交
205 206 207 208 209 210 211 212 213 214 215 216
              SRpcMsg rpcMsg;
              syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);

              SFsmCbMeta cbMeta;
              cbMeta.index = pRollBackEntry->index;
              cbMeta.isWeak = pRollBackEntry->isWeak;
              cbMeta.code = 0;
              cbMeta.state = ths->state;
              cbMeta.seqNum = pRollBackEntry->seqNum;
              ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
              rpcFreeCont(rpcMsg.pCont);
            }
M
Minghao Li 已提交
217

M
Minghao Li 已提交
218
            syncEntryDestory(pRollBackEntry);
M
Minghao Li 已提交
219 220 221
          }
        }

M
Minghao Li 已提交
222 223
        // delete confict entries
        ths->pLogStore->truncate(ths->pLogStore, extraIndex);
M
Minghao Li 已提交
224 225 226 227 228 229 230 231

        // append new entries
        ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);

        // pre commit
        SRpcMsg rpcMsg;
        syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
        if (ths->pFsm != NULL) {
M
Minghao Li 已提交
232
          // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
233
          if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
M
Minghao Li 已提交
234 235 236 237 238 239 240
            SFsmCbMeta cbMeta;
            cbMeta.index = pAppendEntry->index;
            cbMeta.isWeak = pAppendEntry->isWeak;
            cbMeta.code = 2;
            cbMeta.state = ths->state;
            cbMeta.seqNum = pAppendEntry->seqNum;
            ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
241 242 243
          }
        }
        rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
244
      }
M
Minghao Li 已提交
245

M
Minghao Li 已提交
246 247 248
      // free memory
      syncEntryDestory(pExtraEntry);
      syncEntryDestory(pAppendEntry);
M
Minghao Li 已提交
249

M
Minghao Li 已提交
250 251
    } else if (hasExtraEntries && !hasAppendEntries) {
      // do nothing
M
Minghao Li 已提交
252

M
Minghao Li 已提交
253 254 255
    } else if (!hasExtraEntries && hasAppendEntries) {
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
      assert(pAppendEntry != NULL);
M
Minghao Li 已提交
256

M
Minghao Li 已提交
257 258
      // append new entries
      ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
M
Minghao Li 已提交
259

M
Minghao Li 已提交
260 261 262 263
      // pre commit
      SRpcMsg rpcMsg;
      syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
      if (ths->pFsm != NULL) {
M
Minghao Li 已提交
264
        // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
265
        if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
M
Minghao Li 已提交
266 267 268 269 270 271 272
          SFsmCbMeta cbMeta;
          cbMeta.index = pAppendEntry->index;
          cbMeta.isWeak = pAppendEntry->isWeak;
          cbMeta.code = 3;
          cbMeta.state = ths->state;
          cbMeta.seqNum = pAppendEntry->seqNum;
          ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
273
        }
M
Minghao Li 已提交
274
      }
M
Minghao Li 已提交
275
      rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
276

M
Minghao Li 已提交
277 278 279 280 281
      // free memory
      syncEntryDestory(pAppendEntry);

    } else if (!hasExtraEntries && !hasAppendEntries) {
      // do nothing
M
Minghao Li 已提交
282

M
Minghao Li 已提交
283 284
    } else {
      assert(0);
M
Minghao Li 已提交
285
    }
M
Minghao Li 已提交
286

M
Minghao Li 已提交
287
    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
M
Minghao Li 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = true;

    if (hasAppendEntries) {
      pReply->matchIndex = pMsg->prevLogIndex + 1;
    } else {
      pReply->matchIndex = pMsg->prevLogIndex;
    }

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);
M
Minghao Li 已提交
303

M
Minghao Li 已提交
304
    // maybe update commit index from leader
M
Minghao Li 已提交
305
    if (pMsg->commitIndex > ths->commitIndex) {
M
Minghao Li 已提交
306
      // has commit entry in local
M
Minghao Li 已提交
307
      if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
M
Minghao Li 已提交
308 309 310 311
        SyncIndex beginIndex = ths->commitIndex + 1;
        SyncIndex endIndex = pMsg->commitIndex;

        // update commit index
M
Minghao Li 已提交
312
        ths->commitIndex = pMsg->commitIndex;
M
Minghao Li 已提交
313 314

        // call back Wal
M
Minghao Li 已提交
315
        ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
M
Minghao Li 已提交
316 317 318 319 320 321 322 323 324 325 326

        // execute fsm
        if (ths->pFsm != NULL) {
          for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
            if (i != SYNC_INDEX_INVALID) {
              SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
              assert(pEntry != NULL);

              SRpcMsg rpcMsg;
              syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
327
              if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
328 329 330 331 332 333
                SFsmCbMeta cbMeta;
                cbMeta.index = pEntry->index;
                cbMeta.isWeak = pEntry->isWeak;
                cbMeta.code = 0;
                cbMeta.state = ths->state;
                cbMeta.seqNum = pEntry->seqNum;
334 335
                cbMeta.term = pEntry->term;
                cbMeta.currentTerm = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
336
                cbMeta.flag = 0x11;
337

338 339 340 341
                SSnapshot snapshot;
                ASSERT(ths->pFsm->FpGetSnapshot != NULL);
                ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);

342
                bool needExecute = true;
343
                if (cbMeta.index <= snapshot.lastApplyIndex) {
344 345 346 347 348 349
                  needExecute = false;
                }

                if (needExecute) {
                  ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
                }
M
Minghao Li 已提交
350 351
              }

M
Minghao Li 已提交
352 353
              // config change
              if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
M
Minghao Li 已提交
354 355
                SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;

M
Minghao Li 已提交
356
                SSyncCfg newSyncCfg;
M
Minghao Li 已提交
357
                int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
M
Minghao Li 已提交
358 359
                ASSERT(ret == 0);

M
Minghao Li 已提交
360 361 362 363 364 365 366 367 368
                // update new config myIndex
                bool hit = false;
                for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
                  if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
                      ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
                    newSyncCfg.myIndex = i;
                    hit = true;
                    break;
                  }
M
Minghao Li 已提交
369
                }
M
Minghao Li 已提交
370

M
Minghao Li 已提交
371
                SReConfigCbMeta cbMeta = {0};
M
Minghao Li 已提交
372
                bool            isDrop;
M
Minghao Li 已提交
373 374 375 376 377 378 379 380 381 382 383 384

                // I am in newConfig
                if (hit) {
                  syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);

                  // change isStandBy to normal
                  if (!isDrop) {
                    if (ths->state == TAOS_SYNC_STATE_LEADER) {
                      syncNodeBecomeLeader(ths);
                    } else {
                      syncNodeBecomeFollower(ths);
                    }
M
Minghao Li 已提交
385 386
                  }

M
Minghao Li 已提交
387 388 389 390 391 392
                  char* sOld = syncCfg2Str(&oldSyncCfg);
                  char* sNew = syncCfg2Str(&newSyncCfg);
                  sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
                  taosMemoryFree(sOld);
                  taosMemoryFree(sNew);
                }
M
Minghao Li 已提交
393

M
Minghao Li 已提交
394
                // always call FpReConfigCb
M
Minghao Li 已提交
395
                if (ths->pFsm->FpReConfigCb != NULL) {
M
Minghao Li 已提交
396 397 398 399
                  cbMeta.code = 0;
                  cbMeta.currentTerm = ths->pRaftStore->currentTerm;
                  cbMeta.index = pEntry->index;
                  cbMeta.term = pEntry->term;
M
Minghao Li 已提交
400 401 402
                  cbMeta.oldCfg = oldSyncCfg;
                  cbMeta.flag = 0x11;
                  cbMeta.isDrop = isDrop;
M
Minghao Li 已提交
403 404
                  ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
                }
M
Minghao Li 已提交
405 406
              }

407 408 409
              // restore finish
              if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
                if (ths->restoreFinish == false) {
410 411
                  if (ths->pFsm->FpRestoreFinishCb != NULL) {
                    ths->pFsm->FpRestoreFinishCb(ths->pFsm);
412
                  }
413
                  ths->restoreFinish = true;
414
                  sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
415

416
                  /*
417
                  tsem_post(&ths->restoreSem);
418 419
                  sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths);
                  */
420 421 422
                }
              }

M
Minghao Li 已提交
423 424 425 426 427
              rpcFreeCont(rpcMsg.pCont);
              syncEntryDestory(pEntry);
            }
          }
        }
M
Minghao Li 已提交
428 429 430 431
      }
    }
  }

M
Minghao Li 已提交
432 433
  return ret;
}
M
Minghao Li 已提交
434

435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
  int32_t ret = 0;

  char logBuf[128] = {0};
  snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesSnapshotCb== term:%lu", ths->pRaftStore->currentTerm);
  syncAppendEntriesLog2(logBuf, pMsg);

  if (pMsg->term > ths->pRaftStore->currentTerm) {
    syncNodeUpdateTerm(ths, pMsg->term);
  }
  assert(pMsg->term <= ths->pRaftStore->currentTerm);

  // reset elect timer
  if (pMsg->term == ths->pRaftStore->currentTerm) {
    ths->leaderCache = pMsg->srcId;
    syncNodeResetElectTimer(ths);
  }
  assert(pMsg->dataLen >= 0);

  SyncIndex localPreLogIndex;
  SyncTerm  localPreLogTerm;
  ret = syncNodeGetPreIndexTerm(ths, pMsg->prevLogIndex + 1, &localPreLogIndex, &localPreLogTerm);
  ASSERT(ret == 0);

  SyncIndex localLastIndex;
  SyncTerm  localLastTerm;
  ret = syncNodeGetLastIndexTerm(ths, &localLastIndex, &localLastTerm);
  ASSERT(ret == 0);

M
Minghao Li 已提交
464 465 466 467 468 469 470 471 472 473
  bool logOK;
  if (syncNodeIsIndexInSnapshot(ths, pMsg->prevLogIndex)) {
    SSnapshot snapshot;
    ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);

    // maybe this assert is error, because replica take takesnapshot separately
    // leader will reset next index to newest
    ASSERT(pMsg->prevLogIndex == snapshot.lastApplyIndex);

    logOK = (pMsg->prevLogIndex == snapshot.lastApplyIndex) && (pMsg->prevLogTerm == snapshot.lastApplyTerm);
M
Minghao Li 已提交
474 475 476 477
    sTrace(
        "1 - logOK:%d, pMsg->prevLogIndex:%ld, snapshot.lastApplyIndex:%ld, pMsg->prevLogTerm:%lu, "
        "snapshot.lastApplyTerm:%lu",
        logOK, pMsg->prevLogIndex, snapshot.lastApplyIndex, pMsg->prevLogTerm, snapshot.lastApplyTerm);
M
Minghao Li 已提交
478 479 480 481 482 483

  } else {
    logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
            ((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
             (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) &&
             (pMsg->prevLogTerm == localPreLogTerm));
M
Minghao Li 已提交
484 485
    sTrace("2 - logOK:%d, pMsg->prevLogIndex:%ld, getLastIndex:%ld, pMsg->prevLogTerm:%lu, localPreLogTerm:%lu", logOK,
           pMsg->prevLogIndex, ths->pLogStore->getLastIndex(ths->pLogStore), pMsg->prevLogTerm, localPreLogTerm);
M
Minghao Li 已提交
486
  }
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803

  // reject request
  if ((pMsg->term < ths->pRaftStore->currentTerm) ||
      ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
    sTrace(
        "syncNodeOnAppendEntriesSnapshotCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
        "ths->state:%d, "
        "logOK:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);

    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = false;
    pReply->matchIndex = SYNC_INDEX_INVALID;

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);

    return ret;
  }

  // return to follower state
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
    sTrace(
        "syncNodeOnAppendEntriesSnapshotCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
        "ths->state:%d, logOK:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);

    syncNodeBecomeFollower(ths);

    // ret or reply?
    return ret;
  }

  // accept request
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
    // preIndex = -1, or has preIndex entry in local log
    assert(pMsg->prevLogIndex <= localLastIndex);

    // has extra entries (> preIndex) in local log
    bool hasExtraEntries = pMsg->prevLogIndex < localLastIndex;

    // has entries in SyncAppendEntries msg
    bool hasAppendEntries = pMsg->dataLen > 0;

    sTrace(
        "syncNodeOnAppendEntriesSnapshotCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
        "ths->state:%d, "
        "logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
        pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);

    if (hasExtraEntries && hasAppendEntries) {
      // not conflict by default
      bool conflict = false;

      SyncIndex       extraIndex = pMsg->prevLogIndex + 1;
      SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
      assert(pExtraEntry != NULL);

      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
      assert(pAppendEntry != NULL);

      // log not match, conflict
      assert(extraIndex == pAppendEntry->index);
      if (pExtraEntry->term != pAppendEntry->term) {
        conflict = true;
      }

      if (conflict) {
        // roll back
        SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
        SyncIndex delEnd = extraIndex;

        sTrace("syncNodeOnAppendEntriesSnapshotCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin,
               delEnd);

        // notice! reverse roll back!
        for (SyncIndex index = delEnd; index >= delBegin; --index) {
          if (ths->pFsm->FpRollBackCb != NULL) {
            SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
            assert(pRollBackEntry != NULL);

            // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
            if (syncUtilUserRollback(pRollBackEntry->msgType)) {
              SRpcMsg rpcMsg;
              syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);

              SFsmCbMeta cbMeta;
              cbMeta.index = pRollBackEntry->index;
              cbMeta.isWeak = pRollBackEntry->isWeak;
              cbMeta.code = 0;
              cbMeta.state = ths->state;
              cbMeta.seqNum = pRollBackEntry->seqNum;
              ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
              rpcFreeCont(rpcMsg.pCont);
            }

            syncEntryDestory(pRollBackEntry);
          }
        }

        // delete confict entries
        ths->pLogStore->truncate(ths->pLogStore, extraIndex);

        // append new entries
        ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);

        // pre commit
        SRpcMsg rpcMsg;
        syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
        if (ths->pFsm != NULL) {
          // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
          if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
            SFsmCbMeta cbMeta;
            cbMeta.index = pAppendEntry->index;
            cbMeta.isWeak = pAppendEntry->isWeak;
            cbMeta.code = 2;
            cbMeta.state = ths->state;
            cbMeta.seqNum = pAppendEntry->seqNum;
            ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
          }
        }
        rpcFreeCont(rpcMsg.pCont);
      }

      // free memory
      syncEntryDestory(pExtraEntry);
      syncEntryDestory(pAppendEntry);

    } else if (hasExtraEntries && !hasAppendEntries) {
      // do nothing

    } else if (!hasExtraEntries && hasAppendEntries) {
      SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
      assert(pAppendEntry != NULL);

      // append new entries
      ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);

      // pre commit
      SRpcMsg rpcMsg;
      syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
      if (ths->pFsm != NULL) {
        // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
        if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
          SFsmCbMeta cbMeta;
          cbMeta.index = pAppendEntry->index;
          cbMeta.isWeak = pAppendEntry->isWeak;
          cbMeta.code = 3;
          cbMeta.state = ths->state;
          cbMeta.seqNum = pAppendEntry->seqNum;
          ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
        }
      }
      rpcFreeCont(rpcMsg.pCont);

      // free memory
      syncEntryDestory(pAppendEntry);

    } else if (!hasExtraEntries && !hasAppendEntries) {
      // do nothing

    } else {
      assert(0);
    }

    SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
    pReply->srcId = ths->myRaftId;
    pReply->destId = pMsg->srcId;
    pReply->term = ths->pRaftStore->currentTerm;
    pReply->success = true;

    if (hasAppendEntries) {
      pReply->matchIndex = pMsg->prevLogIndex + 1;
    } else {
      pReply->matchIndex = pMsg->prevLogIndex;
    }

    SRpcMsg rpcMsg;
    syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
    syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
    syncAppendEntriesReplyDestroy(pReply);

    // maybe update commit index from leader
    if (pMsg->commitIndex > ths->commitIndex) {
      // has commit entry in local
      if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
        SyncIndex beginIndex = ths->commitIndex + 1;
        SyncIndex endIndex = pMsg->commitIndex;

        // update commit index
        ths->commitIndex = pMsg->commitIndex;

        // call back Wal
        ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);

        // execute fsm
        if (ths->pFsm != NULL) {
          for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
            if (i != SYNC_INDEX_INVALID) {
              SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
              assert(pEntry != NULL);

              SRpcMsg rpcMsg;
              syncEntry2OriginalRpc(pEntry, &rpcMsg);

              if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
                SFsmCbMeta cbMeta;
                cbMeta.index = pEntry->index;
                cbMeta.isWeak = pEntry->isWeak;
                cbMeta.code = 0;
                cbMeta.state = ths->state;
                cbMeta.seqNum = pEntry->seqNum;
                cbMeta.term = pEntry->term;
                cbMeta.currentTerm = ths->pRaftStore->currentTerm;
                cbMeta.flag = 0x11;

                SSnapshot snapshot;
                ASSERT(ths->pFsm->FpGetSnapshot != NULL);
                ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);

                bool needExecute = true;
                if (cbMeta.index <= snapshot.lastApplyIndex) {
                  needExecute = false;
                }

                if (needExecute) {
                  ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
                }
              }

              // config change
              if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
                SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;

                SSyncCfg newSyncCfg;
                int32_t  ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
                ASSERT(ret == 0);

                // update new config myIndex
                bool hit = false;
                for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
                  if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
                      ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
                    newSyncCfg.myIndex = i;
                    hit = true;
                    break;
                  }
                }

                SReConfigCbMeta cbMeta = {0};
                bool            isDrop;

                // I am in newConfig
                if (hit) {
                  syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);

                  // change isStandBy to normal
                  if (!isDrop) {
                    if (ths->state == TAOS_SYNC_STATE_LEADER) {
                      syncNodeBecomeLeader(ths);
                    } else {
                      syncNodeBecomeFollower(ths);
                    }
                  }

                  char* sOld = syncCfg2Str(&oldSyncCfg);
                  char* sNew = syncCfg2Str(&newSyncCfg);
                  sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
                  taosMemoryFree(sOld);
                  taosMemoryFree(sNew);
                }

                // always call FpReConfigCb
                if (ths->pFsm->FpReConfigCb != NULL) {
                  cbMeta.code = 0;
                  cbMeta.currentTerm = ths->pRaftStore->currentTerm;
                  cbMeta.index = pEntry->index;
                  cbMeta.term = pEntry->term;
                  cbMeta.oldCfg = oldSyncCfg;
                  cbMeta.flag = 0x11;
                  cbMeta.isDrop = isDrop;
                  ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
                }
              }

              // restore finish
              if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
                if (ths->restoreFinish == false) {
                  if (ths->pFsm->FpRestoreFinishCb != NULL) {
                    ths->pFsm->FpRestoreFinishCb(ths->pFsm);
                  }
                  ths->restoreFinish = true;
                  sInfo("==syncNodeOnAppendEntriesSnapshotCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);

                  /*
                  tsem_post(&ths->restoreSem);
                  sInfo("==syncNodeOnAppendEntriesSnapshotCb== RestoreFinish tsem_post %p", ths);
                  */
                }
              }

              rpcFreeCont(rpcMsg.pCont);
              syncEntryDestory(pEntry);
            }
          }
        }
      }
    }
  }

  return ret;
}