syncAppendEntries.c 29.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncAppendEntries.h"
18
#include "syncMessage.h"
M
Minghao Li 已提交
19 20
#include "syncRaftLog.h"
#include "syncRaftStore.h"
B
Benguang Zhao 已提交
21
#include "syncReplication.h"
M
Minghao Li 已提交
22
#include "syncUtil.h"
M
Minghao Li 已提交
23

M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
//    LET logOk == \/ m.mprevLogIndex = 0
//                 \/ /\ m.mprevLogIndex > 0
//                    /\ m.mprevLogIndex <= Len(log[i])
//                    /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
//    IN /\ m.mterm <= currentTerm[i]
//       /\ \/ /\ \* reject request
//                \/ m.mterm < currentTerm[i]
//                \/ /\ m.mterm = currentTerm[i]
//                   /\ state[i] = Follower
//                   /\ \lnot logOk
//             /\ Reply([mtype           |-> AppendEntriesResponse,
//                       mterm           |-> currentTerm[i],
//                       msuccess        |-> FALSE,
//                       mmatchIndex     |-> 0,
//                       msource         |-> i,
//                       mdest           |-> j],
//                       m)
//             /\ UNCHANGED <<serverVars, logVars>>
//          \/ \* return to follower state
//             /\ m.mterm = currentTerm[i]
//             /\ state[i] = Candidate
//             /\ state' = [state EXCEPT ![i] = Follower]
//             /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
//          \/ \* accept request
//             /\ m.mterm = currentTerm[i]
//             /\ state[i] = Follower
//             /\ logOk
//             /\ LET index == m.mprevLogIndex + 1
//                IN \/ \* already done with request
//                       /\ \/ m.mentries = << >>
//                          \/ /\ m.mentries /= << >>
//                             /\ Len(log[i]) >= index
//                             /\ log[i][index].term = m.mentries[1].term
//                          \* This could make our commitIndex decrease (for
//                          \* example if we process an old, duplicated request),
//                          \* but that doesn't really affect anything.
//                       /\ commitIndex' = [commitIndex EXCEPT ![i] =
//                                              m.mcommitIndex]
//                       /\ Reply([mtype           |-> AppendEntriesResponse,
//                                 mterm           |-> currentTerm[i],
//                                 msuccess        |-> TRUE,
//                                 mmatchIndex     |-> m.mprevLogIndex +
//                                                     Len(m.mentries),
//                                 msource         |-> i,
//                                 mdest           |-> j],
//                                 m)
//                       /\ UNCHANGED <<serverVars, log>>
//                   \/ \* conflict: remove 1 entry
//                       /\ m.mentries /= << >>
//                       /\ Len(log[i]) >= index
//                       /\ log[i][index].term /= m.mentries[1].term
//                       /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
//                                          log[i][index2]]
//                          IN log' = [log EXCEPT ![i] = new]
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
//                   \/ \* no conflict: append entry
//                       /\ m.mentries /= << >>
//                       /\ Len(log[i]) = m.mprevLogIndex
//                       /\ log' = [log EXCEPT ![i] =
//                                      Append(log[i], m.mentries[1])]
//                       /\ UNCHANGED <<serverVars, commitIndex, messages>>
//       /\ UNCHANGED <<candidateVars, leaderVars>>
//
89

90
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
91
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
92
    sNTrace(ths, "can not do follower commit");
93
    return -1;
M
Minghao Li 已提交
94 95
  }

96 97 98 99 100 101 102 103 104 105 106
  // maybe update commit index, leader notice me
  if (newCommitIndex > ths->commitIndex) {
    // has commit entry in local
    if (newCommitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
      // advance commit index to sanpshot first
      SSnapshot snapshot;
      ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
      if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
        SyncIndex commitBegin = ths->commitIndex;
        SyncIndex commitEnd = snapshot.lastApplyIndex;
        ths->commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
107
        sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin, commitEnd);
108 109 110 111 112 113 114 115 116
      }

      SyncIndex beginIndex = ths->commitIndex + 1;
      SyncIndex endIndex = newCommitIndex;

      // update commit index
      ths->commitIndex = newCommitIndex;

      // call back Wal
M
Minghao Li 已提交
117
      int32_t code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex);
118 119
      ASSERT(code == 0);

M
Minghao Li 已提交
120
      code = syncNodeDoCommit(ths, beginIndex, endIndex, ths->state);
121 122 123 124 125 126 127
      ASSERT(code == 0);
    }
  }

  return 0;
}

B
Benguang Zhao 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
  return syncEntryBuildNoop(term, index, vgId);
}

int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
  ASSERT(pNode->pLogStore != NULL && "log store not created");
  ASSERT(pNode->pFsm != NULL && "pFsm not registered");
  ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");

  SSnapshot snapshot;
  if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
    sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
    goto _err;
  }
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncTerm  commitTerm = snapshot.lastApplyTerm;
145 146

  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
147 148 149 150
  if (lastVer < commitIndex) {
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64
           ", tsdb commit version: %" PRId64 "",
           pNode->vgId, lastVer, commitIndex);
151
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
152 153
    goto _err;
  }
B
Benguang Zhao 已提交
154

155 156
  ASSERT(lastVer >= commitIndex);
  SyncIndex toIndex = lastVer;
B
Benguang Zhao 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
  // update match index
  pBuf->commitIndex = commitIndex;
  pBuf->matchIndex = toIndex;
  pBuf->endIndex = toIndex + 1;

  // load log entries in reverse order
  SSyncLogStore*  pLogStore = pNode->pLogStore;
  SyncIndex       index = toIndex;
  SSyncRaftEntry* pEntry = NULL;
  bool            takeDummy = false;

  while (true) {
    if (index <= pBuf->commitIndex) {
      takeDummy = true;
      break;
    }

    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
      sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
      ASSERT(0);
      break;
    }

    bool taken = false;
    if (toIndex <= index + pBuf->size - 1) {
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
      pBuf->entries[index % pBuf->size] = tmp;
      taken = true;
    }

    if (index < toIndex) {
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
    }

    if (!taken) {
      syncEntryDestroy(pEntry);
      pEntry = NULL;
      break;
    }

    index--;
  }

  // put a dummy record at commitIndex if present in log buffer
  if (takeDummy) {
    ASSERT(index == pBuf->commitIndex);

    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
    if (pDummy == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;

    if (index < toIndex) {
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
    }
  }

  // update startIndex
B
Benguang Zhao 已提交
220
  pBuf->startIndex = takeDummy ? index : index + 1;
B
Benguang Zhao 已提交
221 222 223 224 225 226 227 228 229 230 231

  // validate
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;

_err:
  taosThreadMutexUnlock(&pBuf->mutex);
  return -1;
}

B
Benguang Zhao 已提交
232 233 234 235 236 237 238
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
  SyncIndex       index = pBuf->matchIndex;
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
  ASSERT(pEntry != NULL);
  return pEntry->term;
}

B
Benguang Zhao 已提交
239 240 241
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);
B
Benguang Zhao 已提交
242
  int32_t   ret = -1;
B
Benguang Zhao 已提交
243 244
  SyncIndex index = pEntry->index;
  SyncIndex prevIndex = pEntry->index - 1;
B
Benguang Zhao 已提交
245 246 247
  SyncTerm  lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf);

  if (index <= pBuf->commitIndex) {
248 249 250 251
    sDebug("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
           " %" PRId64 " %" PRId64 ", %" PRId64 ")",
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
           pBuf->endIndex);
B
Benguang Zhao 已提交
252 253 254 255 256
    ret = 0;
    goto _out;
  }

  if (index - pBuf->startIndex >= pBuf->size) {
257 258 259 260
    sDebug("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
           " %" PRId64 " %" PRId64 ", %" PRId64 ")",
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
           pBuf->endIndex);
B
Benguang Zhao 已提交
261 262 263 264
    goto _out;
  }

  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
265 266
    sInfo("vgId:%d, not ready to accept raft entry. index: %" PRId64 ", term: %" PRId64 ": prevterm: %" PRId64
          " != lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
B
Benguang Zhao 已提交
267 268
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
          pBuf->matchIndex, pBuf->endIndex);
B
Benguang Zhao 已提交
269 270 271 272 273 274 275 276
    goto _out;
  }

  // check current in buffer
  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
  if (pExist != NULL) {
    ASSERT(pEntry->index == pExist->index);

B
Benguang Zhao 已提交
277
    if (pEntry->term != pExist->term) {
B
Benguang Zhao 已提交
278 279
      (void)syncLogBufferRollback(pBuf, index);
    } else {
280 281 282 283
      sDebug("vgId:%d, duplicate raft entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
             pBuf->endIndex);
B
Benguang Zhao 已提交
284
      SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm;
B
Benguang Zhao 已提交
285 286
      ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm);
      ret = 0;
B
Benguang Zhao 已提交
287 288 289 290 291 292 293 294 295 296 297 298
      goto _out;
    }
  }

  // update
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
  pEntry = NULL;
  pBuf->entries[index % pBuf->size] = tmp;

  // update end index
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);

B
Benguang Zhao 已提交
299 300 301
  // success
  ret = 0;

B
Benguang Zhao 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
_out:
  syncEntryDestroy(pEntry);
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) {
  SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  (void)memcpy(pEntry, pMsg->data, pMsg->dataLen);
  ASSERT(pEntry->bytes == pMsg->dataLen);
  return pEntry;
}

int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
B
Benguang Zhao 已提交
321
  ASSERT(pEntry->index >= 0);
B
Benguang Zhao 已提交
322 323 324 325 326 327 328 329 330 331 332 333 334
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
  if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) {
    sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index);
    return -1;
  }
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
  ASSERT(pEntry->index == lastVer + 1);

  if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) {
    sError("failed to append raft log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index,
           pEntry->term);
    return -1;
  }
B
Benguang Zhao 已提交
335 336 337

  lastVer = pLogStore->syncLogLastIndex(pLogStore);
  ASSERT(pEntry->index == lastVer);
B
Benguang Zhao 已提交
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
  return 0;
}

int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);

  SSyncLogStore* pLogStore = pNode->pLogStore;
  int64_t matchIndex = pBuf->matchIndex;

  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
    int64_t index = pBuf->matchIndex + 1;
    ASSERT(index >= 0);

    // try to proceed
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
    if (pEntry == NULL) {
      sDebug("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64,
             pNode->vgId, pBuf->matchIndex);
      goto _out;
    }

    ASSERT(index == pEntry->index);

    // match
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
    ASSERT(pMatch != NULL);
    ASSERT(pMatch->index == pBuf->matchIndex);
    ASSERT(pMatch->index + 1 == pEntry->index);
    ASSERT(prevLogIndex == pMatch->index);

    if (pMatch->term != prevLogTerm) {
373
      sInfo(
B
Benguang Zhao 已提交
374 375 376 377 378 379 380 381
          "vgId:%d, mismatching raft log entries encountered. "
          "{ index:%" PRId64 ", term:%" PRId64
          " } "
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
          pNode->vgId, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
      goto _out;
    }

B
Benguang Zhao 已提交
382 383 384
    // increase match index
    pBuf->matchIndex = index;

385 386
    sDebug("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64,
           pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
B
Benguang Zhao 已提交
387

B
Benguang Zhao 已提交
388
    // replicate on demand
B
Benguang Zhao 已提交
389
    (void)syncNodeReplicate(pNode);
B
Benguang Zhao 已提交
390 391 392 393 394 395 396

    // persist
    if (syncLogStorePersist(pLogStore, pEntry) < 0) {
      sError("vgId:%d, failed to persist raft log entry from log buffer since %s. index:%" PRId64, pNode->vgId,
             terrstr(), pEntry->index);
      goto _out;
    }
B
Benguang Zhao 已提交
397
    ASSERT(pEntry->index == pBuf->matchIndex);
B
Benguang Zhao 已提交
398 399

    // update my match index
B
Benguang Zhao 已提交
400
    matchIndex = pBuf->matchIndex;
B
Benguang Zhao 已提交
401 402 403 404
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
  }  // end of while

_out:
B
Benguang Zhao 已提交
405
  pBuf->matchIndex = matchIndex;
B
Benguang Zhao 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return matchIndex;
}

int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) {
  ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM");

  SRpcMsg rpcMsg;
  syncEntry2OriginalRpc(pEntry, &rpcMsg);

  SFsmCbMeta cbMeta = {0};
  cbMeta.index = pEntry->index;
  cbMeta.lastConfigIndex = -1;
  cbMeta.isWeak = pEntry->isWeak;
  cbMeta.code = 0;
  cbMeta.state = role;
  cbMeta.seqNum = pEntry->seqNum;
  cbMeta.term = pEntry->term;
  cbMeta.currentTerm = term;
  cbMeta.flag = -1;

428
  pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
B
Benguang Zhao 已提交
429 430 431 432 433 434 435 436
  return 0;
}

int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
  ASSERT(pBuf->startIndex <= pBuf->matchIndex);
  ASSERT(pBuf->commitIndex <= pBuf->matchIndex);
  ASSERT(pBuf->matchIndex < pBuf->endIndex);
  ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size);
437
  ASSERT(pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem);
B
Benguang Zhao 已提交
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
  return 0;
}

int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);

  SSyncLogStore* pLogStore = pNode->pLogStore;
  SSyncFSM*      pFsm = pNode->pFsm;
  ESyncState     role = pNode->state;
  SyncTerm       term = pNode->pRaftStore->currentTerm;
  SyncGroupId    vgId = pNode->vgId;
  int32_t        ret = 0;
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
  SSyncRaftEntry* pEntry = NULL;
  bool            inBuf = false;

  if (commitIndex <= pBuf->commitIndex) {
    sDebug("vgId:%d, stale commit update. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex,
           commitIndex);
    ret = 0;
    goto _out;
  }

  sDebug("vgId:%d, log buffer info. role: %d, term: %" PRId64 ". start index:%" PRId64 ", commit index:%" PRId64
         ", match index: %" PRId64 ", end index:%" PRId64 "",
         pNode->vgId, role, term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

  // execute in fsm
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
    // get a log entry
B
Benguang Zhao 已提交
469 470 471
    pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
    if (pEntry == NULL) {
      goto _out;
B
Benguang Zhao 已提交
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493
    }

    // execute it
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
      sInfo("vgId:%d, non-user msg in raft log entry. index: %" PRId64 ", term:%" PRId64 "", vgId, pEntry->index,
            pEntry->term);
      pBuf->commitIndex = index;
      if (!inBuf) {
        syncEntryDestroy(pEntry);
        pEntry = NULL;
      }
      continue;
    }

    if (syncLogFsmExecute(pFsm, role, term, pEntry) != 0) {
      sError("vgId:%d, failed to execute raft entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId,
             pEntry->index, pEntry->term);
      ret = -1;
      goto _out;
    }
    pBuf->commitIndex = index;

494 495
    sDebug("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId,
           pEntry->index, pEntry->term, role, term);
B
Benguang Zhao 已提交
496 497 498 499 500 501 502 503

    if (!inBuf) {
      syncEntryDestroy(pEntry);
      pEntry = NULL;
    }
  }

  // recycle
B
Benguang Zhao 已提交
504 505
  SyncIndex used = pBuf->endIndex - pBuf->startIndex;
  SyncIndex until = pBuf->commitIndex - (pBuf->size - used) / 2;
B
Benguang Zhao 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518
  for (SyncIndex index = pBuf->startIndex; index < until; index++) {
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
    ASSERT(pEntry != NULL);
    syncEntryDestroy(pEntry);
    memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
    pBuf->startIndex = index + 1;
  }

_out:
  // mark as restored if needed
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) {
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
    pNode->restoreFinish = true;
519 520
    sInfo("vgId:%d, restore finished. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
          pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
B
Benguang Zhao 已提交
521 522 523 524 525 526 527 528 529 530 531
  }

  if (!inBuf) {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
  }
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

532 533
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncAppendEntries* pMsg = pRpcMsg->pCont;
534
  SRpcMsg            rpcRsp = {0};
535
  bool               accepted = false;
B
Benguang Zhao 已提交
536 537 538 539 540 541
  // if already drop replica, do not process
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
    syncLogRecvAppendEntries(ths, pMsg, "not in my config");
    goto _IGNORE;
  }

542 543 544 545 546 547 548
  int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId);
  if (code != 0) {
    syncLogRecvAppendEntries(ths, pMsg, "build rsp error");
    goto _IGNORE;
  }

  SyncAppendEntriesReply* pReply = rpcRsp.pCont;
B
Benguang Zhao 已提交
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588
  // prepare response msg
  pReply->srcId = ths->myRaftId;
  pReply->destId = pMsg->srcId;
  pReply->term = ths->pRaftStore->currentTerm;
  pReply->success = false;
  pReply->matchIndex = SYNC_INDEX_INVALID;
  pReply->lastSendIndex = pMsg->prevLogIndex + 1;
  pReply->startTime = ths->startTime;

  if (pMsg->term < ths->pRaftStore->currentTerm) {
    goto _SEND_RESPONSE;
  }

  if (pMsg->term > ths->pRaftStore->currentTerm) {
    pReply->term = pMsg->term;
  }

  syncNodeStepDown(ths, pMsg->term);
  syncNodeResetElectTimer(ths);

  if (pMsg->dataLen < (int32_t)sizeof(SSyncRaftEntry)) {
    sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
           ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
    goto _IGNORE;
  }

  SSyncRaftEntry* pEntry = syncLogAppendEntriesToRaftEntry(pMsg);

  if (pEntry == NULL) {
    sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
    goto _IGNORE;
  }

  if (pMsg->prevLogIndex + 1 != pEntry->index) {
    sError("vgId:%d, invalid previous log index in msg. index:%" PRId64 ",  term:%" PRId64 ", prevLogIndex:%" PRId64
           ", prevLogTerm:%" PRId64,
           ths->vgId, pEntry->index, pEntry->term, pMsg->prevLogIndex, pMsg->prevLogTerm);
    goto _IGNORE;
  }

589 590 591
  sDebug("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64
         ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "",
         pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex);
B
Benguang Zhao 已提交
592 593 594 595 596

  // accept
  if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
    goto _SEND_RESPONSE;
  }
597
  accepted = true;
B
Benguang Zhao 已提交
598 599 600

_SEND_RESPONSE:
  pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths);
B
Benguang Zhao 已提交
601
  bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
602 603
  if (accepted && matched) {
    pReply->success = true;
B
Benguang Zhao 已提交
604 605 606
    // update commit index only after matching
    (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
  }
B
Benguang Zhao 已提交
607 608

  // ack, i.e. send response
609
  (void)syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp);
B
Benguang Zhao 已提交
610 611

  // commit index, i.e. leader notice me
B
Benguang Zhao 已提交
612
  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
B
Benguang Zhao 已提交
613 614 615 616 617
    sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr());
    goto _out;
  }

_out:
618 619
  return 0;

B
Benguang Zhao 已提交
620
_IGNORE:
621
  rpcFreeCont(rpcRsp.pCont);
B
Benguang Zhao 已提交
622 623 624
  return 0;
}

625 626 627
int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncAppendEntries* pMsg = pRpcMsg->pCont;
  SRpcMsg            rpcRsp = {0};
628

M
Minghao Li 已提交
629
  // if already drop replica, do not process
M
Minghao Li 已提交
630 631
  if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
    syncLogRecvAppendEntries(ths, pMsg, "not in my config");
M
Minghao Li 已提交
632 633 634
    goto _IGNORE;
  }

M
Minghao Li 已提交
635
  // prepare response msg
636
  int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId);
637 638 639 640 641 642
  if (code != 0) {
    syncLogRecvAppendEntries(ths, pMsg, "build rsp error");
    goto _IGNORE;
  }

  SyncAppendEntriesReply* pReply = rpcRsp.pCont;
M
Minghao Li 已提交
643 644 645 646
  pReply->srcId = ths->myRaftId;
  pReply->destId = pMsg->srcId;
  pReply->term = ths->pRaftStore->currentTerm;
  pReply->success = false;
M
Minghao Li 已提交
647 648
  // pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
  pReply->matchIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
649 650 651 652
  pReply->lastSendIndex = pMsg->prevLogIndex + 1;
  pReply->startTime = ths->startTime;

  if (pMsg->term < ths->pRaftStore->currentTerm) {
M
Minghao Li 已提交
653
    syncLogRecvAppendEntries(ths, pMsg, "reject, small term");
M
Minghao Li 已提交
654 655 656 657 658 659 660 661 662 663 664 665 666 667
    goto _SEND_RESPONSE;
  }

  if (pMsg->term > ths->pRaftStore->currentTerm) {
    pReply->term = pMsg->term;
  }

  syncNodeStepDown(ths, pMsg->term);
  syncNodeResetElectTimer(ths);

  SyncIndex startIndex = ths->pLogStore->syncLogBeginIndex(ths->pLogStore);
  SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);

  if (pMsg->prevLogIndex > lastIndex) {
M
Minghao Li 已提交
668
    syncLogRecvAppendEntries(ths, pMsg, "reject, index not match");
M
Minghao Li 已提交
669 670 671 672 673
    goto _SEND_RESPONSE;
  }

  if (pMsg->prevLogIndex >= startIndex) {
    SyncTerm myPreLogTerm = syncNodeGetPreTerm(ths, pMsg->prevLogIndex + 1);
M
Minghao Li 已提交
674 675 676 677 678
    // ASSERT(myPreLogTerm != SYNC_TERM_INVALID);
    if (myPreLogTerm == SYNC_TERM_INVALID) {
      syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term invalid");
      goto _SEND_RESPONSE;
    }
M
Minghao Li 已提交
679 680

    if (myPreLogTerm != pMsg->prevLogTerm) {
M
Minghao Li 已提交
681
      syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term not match");
M
Minghao Li 已提交
682 683 684 685 686 687 688 689
      goto _SEND_RESPONSE;
    }
  }

  // accept
  pReply->success = true;
  bool hasAppendEntries = pMsg->dataLen > 0;
  if (hasAppendEntries) {
690
    SSyncRaftEntry* pAppendEntry = syncEntryBuildFromAppendEntries(pMsg);
M
Minghao Li 已提交
691 692
    ASSERT(pAppendEntry != NULL);

693 694 695 696 697 698
    SyncIndex appendIndex = pMsg->prevLogIndex + 1;

    LRUHandle* hLocal = NULL;
    LRUHandle* hAppend = NULL;

    int32_t         code = 0;
M
Minghao Li 已提交
699
    SSyncRaftEntry* pLocalEntry = NULL;
700 701 702 703 704 705 706 707 708 709 710 711 712 713
    SLRUCache*      pCache = ths->pLogStore->pCache;
    hLocal = taosLRUCacheLookup(pCache, &appendIndex, sizeof(appendIndex));
    if (hLocal) {
      pLocalEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, hLocal);
      code = 0;

      sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", appendIndex, pLocalEntry->bytes, pLocalEntry);

    } else {
      sNTrace(ths, "miss cache index:%" PRId64, appendIndex);

      code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry);
    }

M
Minghao Li 已提交
714
    if (code == 0) {
715 716
      // get local entry success

M
Minghao Li 已提交
717 718
      if (pLocalEntry->term == pAppendEntry->term) {
        // do nothing
S
Shengliang Guan 已提交
719
        sNTrace(ths, "log match, do nothing, index:%" PRId64, appendIndex);
M
Minghao Li 已提交
720 721 722 723 724 725

      } else {
        // truncate
        code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
        if (code != 0) {
          char logBuf[128];
S
Shengliang Guan 已提交
726
          snprintf(logBuf, sizeof(logBuf), "ignore, truncate error, append-index:%" PRId64, appendIndex);
M
Minghao Li 已提交
727 728
          syncLogRecvAppendEntries(ths, pMsg, logBuf);

729 730 731
          if (hLocal) {
            taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
          } else {
732
            syncEntryDestroy(pLocalEntry);
733 734 735 736 737
          }

          if (hAppend) {
            taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
          } else {
738
            syncEntryDestroy(pAppendEntry);
739 740
          }

M
Minghao Li 已提交
741 742 743
          goto _IGNORE;
        }

B
Benguang Zhao 已提交
744 745
        ASSERT(pAppendEntry->index == appendIndex);

M
Minghao Li 已提交
746 747 748 749
        // append
        code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
        if (code != 0) {
          char logBuf[128];
S
Shengliang Guan 已提交
750
          snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex);
M
Minghao Li 已提交
751 752
          syncLogRecvAppendEntries(ths, pMsg, logBuf);

753 754 755
          if (hLocal) {
            taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
          } else {
756
            syncEntryDestroy(pLocalEntry);
757 758 759 760 761
          }

          if (hAppend) {
            taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
          } else {
762
            syncEntryDestroy(pAppendEntry);
763 764
          }

M
Minghao Li 已提交
765 766
          goto _IGNORE;
        }
767 768

        syncCacheEntry(ths->pLogStore, pAppendEntry, &hAppend);
M
Minghao Li 已提交
769 770 771 772 773 774 775 776 777 778
      }

    } else {
      if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
        // log not exist

        // truncate
        code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
        if (code != 0) {
          char logBuf[128];
S
Shengliang Guan 已提交
779
          snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, truncate error, append-index:%" PRId64, appendIndex);
M
Minghao Li 已提交
780 781
          syncLogRecvAppendEntries(ths, pMsg, logBuf);

782 783
          syncEntryDestroy(pLocalEntry);
          syncEntryDestroy(pAppendEntry);
M
Minghao Li 已提交
784 785 786 787 788 789 790
          goto _IGNORE;
        }

        // append
        code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
        if (code != 0) {
          char logBuf[128];
S
Shengliang Guan 已提交
791
          snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex);
M
Minghao Li 已提交
792 793
          syncLogRecvAppendEntries(ths, pMsg, logBuf);

794 795 796
          if (hLocal) {
            taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
          } else {
797
            syncEntryDestroy(pLocalEntry);
798 799 800 801 802
          }

          if (hAppend) {
            taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
          } else {
803
            syncEntryDestroy(pAppendEntry);
804 805
          }

M
Minghao Li 已提交
806 807 808
          goto _IGNORE;
        }

809 810
        syncCacheEntry(ths->pLogStore, pAppendEntry, &hAppend);

M
Minghao Li 已提交
811
      } else {
812
        // get local entry success
M
Minghao Li 已提交
813
        char logBuf[128];
814 815
        snprintf(logBuf, sizeof(logBuf), "ignore, get local entry error, append-index:%" PRId64 " err:%d", appendIndex,
                 terrno);
M
Minghao Li 已提交
816 817
        syncLogRecvAppendEntries(ths, pMsg, logBuf);

818 819 820
        if (hLocal) {
          taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
        } else {
821
          syncEntryDestroy(pLocalEntry);
822 823 824 825 826
        }

        if (hAppend) {
          taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
        } else {
827
          syncEntryDestroy(pAppendEntry);
828 829
        }

M
Minghao Li 已提交
830 831 832 833 834 835
        goto _IGNORE;
      }
    }

    // update match index
    pReply->matchIndex = pAppendEntry->index;
M
Minghao Li 已提交
836

837 838 839
    if (hLocal) {
      taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
    } else {
840
      syncEntryDestroy(pLocalEntry);
841 842 843 844 845
    }

    if (hAppend) {
      taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
    } else {
846
      syncEntryDestroy(pAppendEntry);
847
    }
M
Minghao Li 已提交
848

M
Minghao Li 已提交
849 850 851 852 853 854 855
  } else {
    // no append entries, do nothing
    // maybe has extra entries, no harm

    // update match index
    pReply->matchIndex = pMsg->prevLogIndex;
  }
M
Minghao Li 已提交
856 857

  // maybe update commit index, leader notice me
858
  syncNodeFollowerCommit(ths, pMsg->commitIndex);
M
Minghao Li 已提交
859

M
Minghao Li 已提交
860
  syncLogRecvAppendEntries(ths, pMsg, "accept");
M
Minghao Li 已提交
861 862 863
  goto _SEND_RESPONSE;

_IGNORE:
864
  rpcFreeCont(rpcRsp.pCont);
M
Minghao Li 已提交
865 866 867 868 869 870 871
  return 0;

_SEND_RESPONSE:
  // msg event log
  syncLogSendAppendEntriesReply(ths, pReply, "");

  // send response
872
  syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp);
M
Minghao Li 已提交
873
  return 0;
B
Benguang Zhao 已提交
874
}