syncPipeline.c 39.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE

18
#include "syncPipeline.h"
19
#include "syncCommit.h"
20
#include "syncIndexMgr.h"
21 22 23 24
#include "syncInt.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
25 26
#include "syncRespMgr.h"
#include "syncSnapshot.h"
27 28
#include "syncUtil.h"

29 30 31 32 33
static bool syncIsMsgBlock(tmsg_t type) {
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
}

34 35 36 37 38 39 40 41 42 43 44 45 46
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
  taosThreadMutexLock(&pBuf->mutex);
  int64_t index = pBuf->endIndex;
  taosThreadMutexUnlock(&pBuf->mutex);
  return index;
}

int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);
  SyncIndex index = pEntry->index;

  if (index - pBuf->startIndex >= pBuf->size) {
47
    sError("vgId:%d, failed to append due to sync log buffer full. index:%" PRId64 "", pNode->vgId, index);
48
    goto _err;
49 50
  }

51
  ASSERT(index == pBuf->endIndex);
52 53

  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
54
  ASSERT(pExist == NULL);
55 56 57

  // initial log buffer with at least one item, e.g. commitIndex
  SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
S
Shengliang Guan 已提交
58
  ASSERTS(pMatch != NULL, "no matched log entry");
59
  ASSERT(pMatch->index + 1 == index);
60 61 62 63 64 65 66 67 68

  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
  pBuf->entries[index % pBuf->size] = tmp;
  pBuf->endIndex = index + 1;

  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;

69
_err:
70 71 72 73 74 75 76 77 78 79 80 81
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return -1;
}

SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  SSyncRaftEntry* pEntry = NULL;
  SyncIndex       prevIndex = index - 1;
  SyncTerm        prevLogTerm = -1;
  terrno = TSDB_CODE_SUCCESS;

82
  if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) return 0;
83

84
  if (prevIndex > pBuf->matchIndex) {
85 86 87 88
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

89
  ASSERT(index - 1 == prevIndex);
90

91 92
  if (prevIndex >= pBuf->startIndex) {
    pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
S
Shengliang Guan 已提交
93
    ASSERTS(pEntry != NULL, "no log entry found");
94
    prevLogTerm = pEntry->term;
95 96 97
    return prevLogTerm;
  }

98
  if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
99
    int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
S
Shengliang Guan 已提交
100
    ASSERTS(timeMs != 0, "no log entry found");
101
    prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
102
    ASSERT(prevIndex == 0 || prevLogTerm != 0);
103 104 105
    return prevLogTerm;
  }

106 107 108
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  if (prevIndex == snapshot.lastApplyIndex) {
109 110 111 112 113 114 115 116 117 118
    return snapshot.lastApplyTerm;
  }

  if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry) == 0) {
    prevLogTerm = pEntry->term;
    syncEntryDestroy(pEntry);
    pEntry = NULL;
    return prevLogTerm;
  }

119
  sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, terrstr(), prevIndex);
120 121 122 123 124 125 126 127
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
  return syncEntryBuildNoop(term, index, vgId);
}

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  if (firstVer > commitIndex + 1) {
    sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer: %" PRId64
           ", tsdb commit version: %" PRId64 "",
           pNode->vgId, firstVer, commitIndex);
    return -1;
  }

  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex) {
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64
           ", tsdb commit version: %" PRId64 "",
           pNode->vgId, lastVer, commitIndex);
    return -1;
  }

  return 0;
}

148
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
S
Shengliang Guan 已提交
149 150 151
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
152

153 154 155
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

156
  SyncIndex commitIndex = snapshot.lastApplyIndex;
157
  SyncTerm  commitTerm = TMAX(snapshot.lastApplyTerm, 0);
158
  if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) {
159 160 161 162
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
    goto _err;
  }

163
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
164
  ASSERT(lastVer >= commitIndex);
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
  SyncIndex toIndex = lastVer;
  // update match index
  pBuf->commitIndex = commitIndex;
  pBuf->matchIndex = toIndex;
  pBuf->endIndex = toIndex + 1;

  // load log entries in reverse order
  SSyncLogStore*  pLogStore = pNode->pLogStore;
  SyncIndex       index = toIndex;
  SSyncRaftEntry* pEntry = NULL;
  bool            takeDummy = false;

  while (true) {
    if (index <= pBuf->commitIndex) {
      takeDummy = true;
      break;
    }

    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
      sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
      break;
    }

    bool taken = false;
189 190
    int  emptySize = 5;
    if (toIndex - index + 1 <= pBuf->size - emptySize) {
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
      pBuf->entries[index % pBuf->size] = tmp;
      taken = true;
    }

    if (index < toIndex) {
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
    }

    if (!taken) {
      syncEntryDestroy(pEntry);
      pEntry = NULL;
      break;
    }

    index--;
  }

  // put a dummy record at commitIndex if present in log buffer
  if (takeDummy) {
212
    ASSERT(index == pBuf->commitIndex);
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230

    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
    if (pDummy == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;

    if (index < toIndex) {
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
    }
  }

  // update startIndex
  pBuf->startIndex = takeDummy ? index : index + 1;

231 232 233
  sInfo("vgId:%d, init sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

234 235 236 237 238 239 240 241
  // validate
  syncLogBufferValidate(pBuf);
  return 0;

_err:
  return -1;
}

242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
    if (pEntry == NULL) continue;
    syncEntryDestroy(pEntry);
    pEntry = NULL;
    memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
  }
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
  if (ret < 0) {
    sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr());
  }
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

267 268 269
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
  SyncIndex       index = pBuf->matchIndex;
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
270
  ASSERT(pEntry != NULL);
271 272 273 274 275 276 277 278 279 280
  return pEntry->term;
}

int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);
  int32_t   ret = -1;
  SyncIndex index = pEntry->index;
  SyncIndex prevIndex = pEntry->index - 1;
  SyncTerm  lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf);
281 282
  SSyncRaftEntry* pExist = NULL;
  bool            inBuf = true;
283 284

  if (index <= pBuf->commitIndex) {
285 286
    sTrace("vgId:%d, already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64
           " %" PRId64 ", %" PRId64 ")",
287 288
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
           pBuf->endIndex);
289
    SyncTerm term = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index + 1);
290
    ASSERT(pEntry->term >= 0);
291 292 293
    if (term == pEntry->term) {
      ret = 0;
    }
294 295 296 297
    goto _out;
  }

  if (index - pBuf->startIndex >= pBuf->size) {
298
    sWarn("vgId:%d, out of buffer range. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64
299
          " %" PRId64 ", %" PRId64 ")",
300 301 302 303 304 305
          pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
          pBuf->endIndex);
    goto _out;
  }

  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
306
    sWarn("vgId:%d, not ready to accept. index: %" PRId64 ", term: %" PRId64 ": prevterm: %" PRId64
307 308 309 310 311 312 313
          " != lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
          pBuf->matchIndex, pBuf->endIndex);
    goto _out;
  }

  // check current in buffer
314
  pExist = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
315
  if (pExist != NULL) {
316
    ASSERT(pEntry->index == pExist->index);
317
    if (pEntry->term != pExist->term) {
318
      (void)syncLogBufferRollback(pBuf, pNode, index);
319
    } else {
320
      sTrace("vgId:%d, duplicate log entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
321 322 323
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
             pBuf->endIndex);
324 325
      SyncTerm existPrevTerm = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index);
      ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm));
326 327 328 329 330 331
      ret = 0;
      goto _out;
    }
  }

  // update
332
  ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL);
333 334 335 336 337 338 339 340 341 342 343 344
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
  pEntry = NULL;
  pBuf->entries[index % pBuf->size] = tmp;

  // update end index
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);

  // success
  ret = 0;

_out:
  syncEntryDestroy(pEntry);
345 346 347 348
  if (!inBuf) {
    syncEntryDestroy(pExist);
    pExist = NULL;
  }
349 350 351 352 353 354
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
355
  ASSERT(pEntry->index >= 0);
356 357 358 359 360 361
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
  if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) {
    sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index);
    return -1;
  }
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
362
  ASSERT(pEntry->index == lastVer + 1);
363 364

  if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) {
365
    sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index,
366 367 368 369 370
           pEntry->term);
    return -1;
  }

  lastVer = pLogStore->syncLogLastIndex(pLogStore);
371
  ASSERT(pEntry->index == lastVer);
372 373 374
  return 0;
}

375
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm) {
376 377 378 379 380 381 382 383
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);

  SSyncLogStore* pLogStore = pNode->pLogStore;
  int64_t        matchIndex = pBuf->matchIndex;

  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
    int64_t index = pBuf->matchIndex + 1;
384
    ASSERT(index >= 0);
385 386 387 388 389 390 391

    // try to proceed
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
    if (pEntry == NULL) {
392
      sTrace("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64,
393 394 395 396
             pNode->vgId, pBuf->matchIndex);
      goto _out;
    }

397
    ASSERT(index == pEntry->index);
398 399 400

    // match
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
401 402 403 404
    ASSERT(pMatch != NULL);
    ASSERT(pMatch->index == pBuf->matchIndex);
    ASSERT(pMatch->index + 1 == pEntry->index);
    ASSERT(prevLogIndex == pMatch->index);
405 406 407

    if (pMatch->term != prevLogTerm) {
      sInfo(
408
          "vgId:%d, mismatching sync log entries encountered. "
409 410 411 412 413 414 415 416 417 418
          "{ index:%" PRId64 ", term:%" PRId64
          " } "
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
          pNode->vgId, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
      goto _out;
    }

    // increase match index
    pBuf->matchIndex = index;

419 420
    sTrace("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64,
           pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
421 422

    // replicate on demand
423
    (void)syncNodeReplicateWithoutLock(pNode);
424 425 426

    // persist
    if (syncLogStorePersist(pLogStore, pEntry) < 0) {
427 428
      sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
             pEntry->index);
429 430
      goto _out;
    }
431
    ASSERT(pEntry->index == pBuf->matchIndex);
432 433 434 435 436 437 438 439

    // update my match index
    matchIndex = pBuf->matchIndex;
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
  }  // end of while

_out:
  pBuf->matchIndex = matchIndex;
440 441 442
  if (pMatchTerm) {
    *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
  }
443 444 445 446 447
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return matchIndex;
}

448 449
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
                          int32_t applyCode) {
450 451 452 453
  if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) {
    return 0;
  }

454 455 456
  if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
    sTrace("vgId:%d, blocking msg ready to execute, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x",
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
457 458
  }

459
  SRpcMsg rpcMsg = {.code = applyCode};
460 461 462 463
  syncEntry2OriginalRpc(pEntry, &rpcMsg);

  SFsmCbMeta cbMeta = {0};
  cbMeta.index = pEntry->index;
464
  cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
465
  cbMeta.isWeak = pEntry->isWeak;
466
  cbMeta.code = applyCode;
467 468 469 470 471 472
  cbMeta.state = role;
  cbMeta.seqNum = pEntry->seqNum;
  cbMeta.term = pEntry->term;
  cbMeta.currentTerm = term;
  cbMeta.flag = -1;

473
  (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
474 475
  int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
  return code;
476 477 478
}

int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
479 480 481 482 483
  ASSERT(pBuf->startIndex <= pBuf->matchIndex);
  ASSERT(pBuf->commitIndex <= pBuf->matchIndex);
  ASSERT(pBuf->matchIndex < pBuf->endIndex);
  ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size);
  ASSERT(pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem);
484 485 486 487 488 489 490 491 492 493 494 495
  return 0;
}

int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);

  SSyncLogStore*  pLogStore = pNode->pLogStore;
  SSyncFSM*       pFsm = pNode->pFsm;
  ESyncState      role = pNode->state;
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SyncGroupId     vgId = pNode->vgId;
496
  int32_t         ret = -1;
497 498 499 500 501
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
  SSyncRaftEntry* pEntry = NULL;
  bool            inBuf = false;

  if (commitIndex <= pBuf->commitIndex) {
502
    sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex,
503 504 505 506 507
           commitIndex);
    ret = 0;
    goto _out;
  }

508 509
  sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role: %d, term: %" PRId64,
         pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, term);
510 511 512 513 514 515 516 517 518 519 520

  // execute in fsm
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
    // get a log entry
    pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
    if (pEntry == NULL) {
      goto _out;
    }

    // execute it
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
521
      sInfo("vgId:%d, commit sync barrier. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index,
522
            pEntry->term, TMSG_INFO(pEntry->originalRpcType));
523
    }
524

525
    if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry, 0) != 0) {
526
      sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
527 528
             ", role: %d, current term: %" PRId64,
             vgId, pEntry->index, pEntry->term, role, term);
529 530 531 532
      goto _out;
    }
    pBuf->commitIndex = index;

533
    sTrace("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId,
534 535 536 537 538 539 540 541 542
           pEntry->index, pEntry->term, role, term);

    if (!inBuf) {
      syncEntryDestroy(pEntry);
      pEntry = NULL;
    }
  }

  // recycle
543
  SyncIndex until = pBuf->commitIndex - (pBuf->size >> 4);
544 545
  for (SyncIndex index = pBuf->startIndex; index < until; index++) {
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
546
    ASSERT(pEntry != NULL);
547 548 549 550 551
    syncEntryDestroy(pEntry);
    memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
    pBuf->startIndex = index + 1;
  }

552
  ret = 0;
553 554 555 556 557
_out:
  // mark as restored if needed
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) {
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
    pNode->restoreFinish = true;
558
    sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
559 560 561 562 563 564 565 566 567 568 569 570
          pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
  }

  if (!inBuf) {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
  }
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

571 572 573
void syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
  if (pMgr == NULL) return;

574
  ASSERT(pMgr->startIndex >= 0);
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
    memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
  }
  pMgr->startIndex = 0;
  pMgr->matchIndex = 0;
  pMgr->endIndex = 0;
  pMgr->restored = false;
  pMgr->retryBackoff = 0;
}

int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
  if (pMgr->endIndex <= pMgr->startIndex) {
    return 0;
  }

590 591 592 593 594 595 596 597
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
  if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
    syncLogReplMgrReset(pMgr);
    sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer: %" PRIx64, pNode->vgId,
          pDestId->addr);
    return -1;
  }

598 599 600
  int32_t ret = -1;
  bool    retried = false;
  int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr);
601 602 603 604
  int64_t  nowMs = taosGetMonoTimestampMs();
  int      count = 0;
  int64_t  firstIndex = -1;
  SyncTerm term = -1;
605
  int64_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
606 607 608

  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
    int64_t pos = index % pMgr->size;
609
    ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex));
610

611 612 613 614
    if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
      break;
    }

615 616 617 618 619
    if (pMgr->states[pos].acked) {
      continue;
    }

    bool barrier = false;
620
    if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
621
      sError("vgId:%d, failed to replicate sync log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId,
622 623 624
             terrstr(), index, pDestId->addr);
      goto _out;
    }
625
    ASSERT(barrier == pMgr->states[pos].barrier);
626 627 628
    pMgr->states[pos].timeMs = nowMs;
    pMgr->states[pos].term = term;
    pMgr->states[pos].acked = false;
629

630
    retried = true;
631
    if (firstIndex == -1) firstIndex = index;
632 633 634 635

    if (batchSize <= count++) {
      break;
    }
636 637 638 639 640 641
  }

  ret = 0;
_out:
  if (retried) {
    pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr);
642
    sInfo("vgId:%d, resent %d sync log entries. dest: %" PRIx64 ", indexes: %" PRId64 " ..., terms: ... %" PRId64
643
          ", retryWaitMs: %" PRId64 ", repl mgr: [%" PRId64 " %" PRId64 ", %" PRId64 ")",
644 645
          pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
          pMgr->endIndex);
646 647 648 649 650 651 652 653
  }
  return ret;
}

int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode,
                                                 SyncAppendEntriesReply* pMsg) {
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  SRaftId         destId = pMsg->srcId;
654
  ASSERT(pMgr->restored == false);
655 656
  char     host[64];
  uint16_t port;
657
  syncUtilU642Addr(destId.addr, host, sizeof(host), &port);
658 659

  if (pMgr->endIndex == 0) {
660 661
    ASSERT(pMgr->startIndex == 0);
    ASSERT(pMgr->matchIndex == 0);
662 663
    if (pMsg->matchIndex < 0) {
      pMgr->restored = true;
664 665
      sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
666
            pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
667 668 669 670 671 672 673 674 675 676 677
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
      return 0;
    }
  } else {
    if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
      syncLogReplMgrRetryOnNeed(pMgr, pNode);
      return 0;
    }

    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;

678
    if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
679
      pMgr->matchIndex = pMsg->matchIndex;
680
      pMgr->restored = true;
681 682
      sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
683
            pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
684 685 686 687
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
      return 0;
    }

688
    if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) {
689 690
      sWarn("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64,
            pNode->vgId, host, port, pMsg->matchIndex, pMsg->lastSendIndex);
691 692 693 694
      if (syncNodeStartSnapshot(pNode, &destId) < 0) {
        sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port);
        return -1;
      }
695
      sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port);
696 697
      return 0;
    }
698 699
  }

700 701
  // check last match term
  SyncTerm  term = -1;
702
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
703 704 705 706 707
  SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);

  if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
    term = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index + 1);
    if (term < 0 || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
708
      ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
709 710
      if (syncNodeStartSnapshot(pNode, &destId) < 0) {
        sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port);
711
        return -1;
712
      }
713
      sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port);
714 715 716
      return 0;
    }

717
    ASSERT(index + 1 >= firstVer);
718 719 720

    if (term == pMsg->lastMatchTerm) {
      index = index + 1;
721
      ASSERT(index <= pNode->pLogBuf->matchIndex);
722
    } else {
723
      ASSERT(index > firstVer);
724 725 726
    }
  }

727
  // attempt to replicate the raft log at index
728 729
  (void)syncLogReplMgrReset(pMgr);
  return syncLogReplMgrReplicateProbeOnce(pMgr, pNode, index);
730 731 732 733 734 735
}

int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  taosThreadMutexLock(&pBuf->mutex);
  if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
736 737
    sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer: %" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
738
    syncLogReplMgrReset(pMgr);
739 740 741 742 743 744 745 746 747 748
    pMgr->peerStartTime = pMsg->startTime;
  }
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;
}

int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  taosThreadMutexLock(&pBuf->mutex);
  if (pMsg->startTime != pMgr->peerStartTime) {
749 750
    sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer: %" PRIx64 ", start time:%" PRId64
          ", old:%" PRId64,
751
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
752
    syncLogReplMgrReset(pMgr);
753 754 755 756 757 758 759 760 761 762 763 764
    pMgr->peerStartTime = pMsg->startTime;
  }

  if (pMgr->restored) {
    (void)syncLogReplMgrProcessReplyInNormalMode(pMgr, pNode, pMsg);
  } else {
    (void)syncLogReplMgrProcessReplyInRecoveryMode(pMgr, pNode, pMsg);
  }
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;
}

765
int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
766 767 768
  if (pMgr->restored) {
    (void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
  } else {
769
    (void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode, pNode->pLogBuf->matchIndex);
770 771 772 773
  }
  return 0;
}

774
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
775 776
  ASSERT(!pMgr->restored);
  ASSERT(pMgr->startIndex >= 0);
777 778 779 780 781 782 783 784 785
  int64_t retryMaxWaitMs = SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
  int64_t nowMs = taosGetMonoTimestampMs();

  if (pMgr->endIndex > pMgr->startIndex &&
      nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
    return 0;
  }
  (void)syncLogReplMgrReset(pMgr);

786 787 788 789 790 791 792 793 794
  SRaftId*  pDestId = &pNode->replicasId[pMgr->peerId];
  bool      barrier = false;
  SyncTerm  term = -1;
  if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
    sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
           terrstr(), index, pDestId->addr);
    return -1;
  }

795
  ASSERT(index >= 0);
796 797 798 799 800 801 802 803
  pMgr->states[index % pMgr->size].barrier = barrier;
  pMgr->states[index % pMgr->size].timeMs = nowMs;
  pMgr->states[index % pMgr->size].term = term;
  pMgr->states[index % pMgr->size].acked = false;

  pMgr->startIndex = index;
  pMgr->endIndex = index + 1;

804
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
805
  sTrace("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64
806 807 808 809 810 811 812 813
         ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
         ")",
         pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
  return 0;
}

int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
814
  ASSERT(pMgr->restored);
815

816
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
817
  int32_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
818 819
  int32_t  count = 0;
  int64_t  nowMs = taosGetMonoTimestampMs();
820
  int64_t  limit = pMgr->size >> 1;
821 822

  for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
823
    if (batchSize < count++ || limit <= index - pMgr->startIndex) {
824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844
      break;
    }
    if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
      break;
    }
    int64_t  pos = index % pMgr->size;
    SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
    bool     barrier = false;
    SyncTerm term = -1;
    if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
      sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
             terrstr(), index, pDestId->addr);
      return -1;
    }
    pMgr->states[pos].barrier = barrier;
    pMgr->states[pos].timeMs = nowMs;
    pMgr->states[pos].term = term;
    pMgr->states[pos].acked = false;

    pMgr->endIndex = index + 1;
    if (barrier) {
845 846 847 848
      sInfo("vgId:%d, replicated sync barrier to dest: %" PRIx64 ". index: %" PRId64 ", term: %" PRId64
            ", repl mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 ")",
            pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex,
            pMgr->endIndex);
849 850 851 852
      break;
    }
  }

853 854
  syncLogReplMgrRetryOnNeed(pMgr, pNode);

855
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
856
  sTrace("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
857 858 859 860 861 862 863
         "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
         pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
  return 0;
}

int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
864
  ASSERT(pMgr->restored == true);
865
  if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
866 867 868 869 870 871 872 873
     if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
        int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
        int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
        int64_t timeDiffMs = lastSentMs - firstSentMs;
        if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
            pMgr->retryBackoff -= 1;
        }
    }
874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
    pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
    for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
      memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
    }
    pMgr->startIndex = pMgr->matchIndex;
  }

  return syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
}

SSyncLogReplMgr* syncLogReplMgrCreate() {
  SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
  if (pMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);

894
  ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE);
895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912

  return pMgr;

_err:
  taosMemoryFree(pMgr);
  return NULL;
}

void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) {
  if (pMgr == NULL) {
    return;
  }
  (void)taosMemoryFree(pMgr);
  return;
}

int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) {
  for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
913
    ASSERT(pNode->logReplMgrs[i] == NULL);
914 915
    pNode->logReplMgrs[i] = syncLogReplMgrCreate();
    pNode->logReplMgrs[i]->peerId = i;
S
Shengliang Guan 已提交
916
    ASSERTS(pNode->logReplMgrs[i] != NULL, "Out of memory.");
917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936
  }
  return 0;
}

void syncNodeLogReplMgrDestroy(SSyncNode* pNode) {
  for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
    syncLogReplMgrDestroy(pNode->logReplMgrs[i]);
    pNode->logReplMgrs[i] = NULL;
  }
}

SSyncLogBuffer* syncLogBufferCreate() {
  SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);

937
  ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE);
938

939 940 941 942 943 944 945 946 947 948 949 950 951
  if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
    sError("failed to init log buffer mutexattr due to %s", strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
    sError("failed to set log buffer mutexattr type due to %s", strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
952 953 954 955
    sError("failed to init log buffer mutex due to %s", strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
956

957 958 959 960 961 962 963
  return pBuf;

_err:
  taosMemoryFree(pBuf);
  return NULL;
}

964 965 966 967 968 969 970 971 972 973 974 975 976
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
  taosThreadMutexLock(&pBuf->mutex);
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
    if (pEntry == NULL) continue;
    syncEntryDestroy(pEntry);
    pEntry = NULL;
    memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
  }
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
  taosThreadMutexUnlock(&pBuf->mutex);
}

977 978 979 980
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
  if (pBuf == NULL) {
    return;
  }
981
  syncLogBufferClear(pBuf);
982
  (void)taosThreadMutexDestroy(&pBuf->mutex);
983
  (void)taosThreadMutexAttrDestroy(&pBuf->attr);
984 985 986 987
  (void)taosMemoryFree(pBuf);
  return;
}

988
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
989
  ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex);
990

991 992 993 994
  sInfo("vgId:%d, rollback sync log buffer. toindex: %" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64
        ", %" PRId64 ")",
        pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

995
  // trunc buffer
996 997 998 999
  SyncIndex index = pBuf->endIndex - 1;
  while (index >= toIndex) {
    SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
    if (pEntry != NULL) {
1000
      (void)syncEntryDestroy(pEntry);
1001 1002 1003 1004 1005 1006 1007
      pEntry = NULL;
      memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
    }
    index--;
  }
  pBuf->endIndex = toIndex;
  pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
1008
  ASSERT(index + 1 == toIndex);
1009 1010 1011 1012 1013 1014 1015 1016

  // trunc wal
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer >= toIndex && pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex) < 0) {
    sError("vgId:%d, failed to truncate log store since %s. from index:%" PRId64 "", pNode->vgId, terrstr(), toIndex);
    return -1;
  }
  lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
1017
  ASSERT(toIndex == lastVer + 1);
1018

1019 1020 1021 1022 1023 1024 1025 1026 1027 1028
  // refill buffer on need
  if (toIndex <= pBuf->startIndex) {
    int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
    if (ret < 0) {
      sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, terrstr());
      return -1;
    }
  }

  ASSERT(pBuf->endIndex == toIndex);
1029
  syncLogBufferValidate(pBuf);
1030 1031 1032 1033 1034 1035
  return 0;
}

int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
1036
  ASSERT(lastVer == pBuf->matchIndex);
1037 1038
  SyncIndex index = pBuf->endIndex - 1;

1039
  (void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1);
1040

1041
  sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
1042 1043 1044 1045 1046 1047 1048
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

  pBuf->endIndex = pBuf->matchIndex + 1;

  // reset repl mgr
  for (int i = 0; i < pNode->replicaNum; i++) {
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
1049
    syncLogReplMgrReset(pMgr);
1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083
  }
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;
}

SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf) {
  SSyncRaftEntry* pEntry = NULL;
  if (index >= pBuf->endIndex) {
    return NULL;
  }
  if (index > pBuf->startIndex) {  // startIndex might be dummy
    *pInBuf = true;
    pEntry = pBuf->entries[index % pBuf->size].pItem;
  } else {
    *pInBuf = false;
    if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, &pEntry) < 0) {
      sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
    }
  }
  return pEntry;
}

int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
                                    SRaftId* pDestId, bool* pBarrier) {
  SSyncRaftEntry* pEntry = NULL;
  SRpcMsg         msgOut = {0};
  bool            inBuf = false;
  int32_t         ret = -1;
  SyncTerm        prevLogTerm = -1;
  SSyncLogBuffer* pBuf = pNode->pLogBuf;

  pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
  if (pEntry == NULL) {
    sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index);
1084 1085 1086
    if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
      SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
      if (pMgr) {
1087
        sInfo("vgId:%d, reset sync log repl mgr of peer: %" PRIx64 " since %s. index: %" PRId64, pNode->vgId,
1088
              pDestId->addr, terrstr(), index);
1089 1090 1091
        (void)syncLogReplMgrReset(pMgr);
      }
    }
1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
    goto _err;
  }
  *pBarrier = syncLogIsReplicationBarrier(pEntry);

  prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index);
  if (prevLogTerm < 0) {
    sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index);
    goto _err;
  }
  if (pTerm) *pTerm = pEntry->term;

1103
  int32_t code = syncBuildAppendEntriesFromRaftLog(pNode, pEntry, prevLogTerm, &msgOut);
1104 1105 1106 1107 1108 1109 1110 1111
  if (code < 0) {
    sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
    goto _err;
  }

  (void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut);
  ret = 0;

1112 1113
  sTrace("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64,
         pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129

  if (!inBuf) {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
  }
  return 0;

_err:
  rpcFreeCont(msgOut.pCont);
  msgOut.pCont = NULL;
  if (!inBuf) {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
  }
  return -1;
}