syncPipeline.c 39.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE

18
#include "syncPipeline.h"
19
#include "syncCommit.h"
20
#include "syncIndexMgr.h"
21 22 23 24
#include "syncInt.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
25 26
#include "syncRespMgr.h"
#include "syncSnapshot.h"
27 28
#include "syncUtil.h"

29 30 31 32 33
static bool syncIsMsgBlock(tmsg_t type) {
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
}

34 35 36 37 38 39 40 41 42 43 44 45 46
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
  taosThreadMutexLock(&pBuf->mutex);
  int64_t index = pBuf->endIndex;
  taosThreadMutexUnlock(&pBuf->mutex);
  return index;
}

int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);
  SyncIndex index = pEntry->index;

  if (index - pBuf->startIndex >= pBuf->size) {
47
    sError("vgId:%d, failed to append due to sync log buffer full. index:%" PRId64 "", pNode->vgId, index);
48 49 50
    goto _out;
  }

51
  ASSERT(index == pBuf->endIndex);
52 53

  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
54
  ASSERT(pExist == NULL);
55 56 57

  // initial log buffer with at least one item, e.g. commitIndex
  SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
S
Shengliang Guan 已提交
58
  ASSERTS(pMatch != NULL, "no matched log entry");
59
  ASSERT(pMatch->index + 1 == index);
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82

  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
  pBuf->entries[index % pBuf->size] = tmp;
  pBuf->endIndex = index + 1;

  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;

_out:
  syncLogBufferValidate(pBuf);
  syncEntryDestroy(pEntry);
  taosThreadMutexUnlock(&pBuf->mutex);
  return -1;
}

SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  SSyncRaftEntry* pEntry = NULL;
  SyncIndex       prevIndex = index - 1;
  SyncTerm        prevLogTerm = -1;
  terrno = TSDB_CODE_SUCCESS;

83
  if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) return 0;
84

85
  if (prevIndex > pBuf->matchIndex) {
86 87 88 89
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

90
  ASSERT(index - 1 == prevIndex);
91

92 93
  if (prevIndex >= pBuf->startIndex) {
    pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
S
Shengliang Guan 已提交
94
    ASSERTS(pEntry != NULL, "no log entry found");
95
    prevLogTerm = pEntry->term;
96 97 98
    return prevLogTerm;
  }

99
  if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
100
    int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
S
Shengliang Guan 已提交
101
    ASSERTS(timeMs != 0, "no log entry found");
102
    prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
103
    ASSERT(prevIndex == 0 || prevLogTerm != 0);
104 105 106
    return prevLogTerm;
  }

107 108 109
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  if (prevIndex == snapshot.lastApplyIndex) {
110 111 112 113 114 115 116 117 118 119
    return snapshot.lastApplyTerm;
  }

  if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry) == 0) {
    prevLogTerm = pEntry->term;
    syncEntryDestroy(pEntry);
    pEntry = NULL;
    return prevLogTerm;
  }

120
  sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, terrstr(), prevIndex);
121 122 123 124 125 126 127 128
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
  return syncEntryBuildNoop(term, index, vgId);
}

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  if (firstVer > commitIndex + 1) {
    sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer: %" PRId64
           ", tsdb commit version: %" PRId64 "",
           pNode->vgId, firstVer, commitIndex);
    return -1;
  }

  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex) {
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64
           ", tsdb commit version: %" PRId64 "",
           pNode->vgId, lastVer, commitIndex);
    return -1;
  }

  return 0;
}

149
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
S
Shengliang Guan 已提交
150 151 152
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
153

154 155 156
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

157
  SyncIndex commitIndex = snapshot.lastApplyIndex;
158
  SyncTerm  commitTerm = TMAX(snapshot.lastApplyTerm, 0);
159
  if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) {
160 161 162 163
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
    goto _err;
  }

164
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
165
  ASSERT(lastVer >= commitIndex);
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
  SyncIndex toIndex = lastVer;
  // update match index
  pBuf->commitIndex = commitIndex;
  pBuf->matchIndex = toIndex;
  pBuf->endIndex = toIndex + 1;

  // load log entries in reverse order
  SSyncLogStore*  pLogStore = pNode->pLogStore;
  SyncIndex       index = toIndex;
  SSyncRaftEntry* pEntry = NULL;
  bool            takeDummy = false;

  while (true) {
    if (index <= pBuf->commitIndex) {
      takeDummy = true;
      break;
    }

    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
      sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
      break;
    }

    bool taken = false;
190 191
    int  emptySize = 5;
    if (toIndex - index + 1 <= pBuf->size - emptySize) {
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
      pBuf->entries[index % pBuf->size] = tmp;
      taken = true;
    }

    if (index < toIndex) {
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
    }

    if (!taken) {
      syncEntryDestroy(pEntry);
      pEntry = NULL;
      break;
    }

    index--;
  }

  // put a dummy record at commitIndex if present in log buffer
  if (takeDummy) {
213
    ASSERT(index == pBuf->commitIndex);
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231

    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
    if (pDummy == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;

    if (index < toIndex) {
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
    }
  }

  // update startIndex
  pBuf->startIndex = takeDummy ? index : index + 1;

232 233 234
  sInfo("vgId:%d, init sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

235 236 237 238 239 240 241 242
  // validate
  syncLogBufferValidate(pBuf);
  return 0;

_err:
  return -1;
}

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
    if (pEntry == NULL) continue;
    syncEntryDestroy(pEntry);
    pEntry = NULL;
    memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
  }
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
  if (ret < 0) {
    sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr());
  }
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

268 269 270
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
  SyncIndex       index = pBuf->matchIndex;
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
271
  ASSERT(pEntry != NULL);
272 273 274 275 276 277 278 279 280 281
  return pEntry->term;
}

int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);
  int32_t   ret = -1;
  SyncIndex index = pEntry->index;
  SyncIndex prevIndex = pEntry->index - 1;
  SyncTerm  lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf);
282 283
  SSyncRaftEntry* pExist = NULL;
  bool            inBuf = true;
284 285

  if (index <= pBuf->commitIndex) {
286 287
    sTrace("vgId:%d, already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64
           " %" PRId64 ", %" PRId64 ")",
288 289
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
           pBuf->endIndex);
290
    SyncTerm term = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index + 1);
291
    ASSERT(pEntry->term >= 0);
292 293 294
    if (term == pEntry->term) {
      ret = 0;
    }
295 296 297 298
    goto _out;
  }

  if (index - pBuf->startIndex >= pBuf->size) {
299
    sWarn("vgId:%d, out of buffer range. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64
300
          " %" PRId64 ", %" PRId64 ")",
301 302 303 304 305 306
          pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
          pBuf->endIndex);
    goto _out;
  }

  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
307
    sWarn("vgId:%d, not ready to accept. index: %" PRId64 ", term: %" PRId64 ": prevterm: %" PRId64
308 309 310 311 312 313 314
          " != lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
          pBuf->matchIndex, pBuf->endIndex);
    goto _out;
  }

  // check current in buffer
315
  pExist = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
316
  if (pExist != NULL) {
317
    ASSERT(pEntry->index == pExist->index);
318
    if (pEntry->term != pExist->term) {
319
      (void)syncLogBufferRollback(pBuf, pNode, index);
320
    } else {
321
      sTrace("vgId:%d, duplicate log entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
322 323 324
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
             pBuf->endIndex);
325 326
      SyncTerm existPrevTerm = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index);
      ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm));
327 328 329 330 331 332
      ret = 0;
      goto _out;
    }
  }

  // update
333
  ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL);
334 335 336 337 338 339 340 341 342 343 344 345
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
  pEntry = NULL;
  pBuf->entries[index % pBuf->size] = tmp;

  // update end index
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);

  // success
  ret = 0;

_out:
  syncEntryDestroy(pEntry);
346 347 348 349
  if (!inBuf) {
    syncEntryDestroy(pExist);
    pExist = NULL;
  }
350 351 352 353 354 355
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
356
  ASSERT(pEntry->index >= 0);
357 358 359 360 361 362
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
  if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) {
    sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index);
    return -1;
  }
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
363
  ASSERT(pEntry->index == lastVer + 1);
364 365

  if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) {
366
    sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index,
367 368 369 370 371
           pEntry->term);
    return -1;
  }

  lastVer = pLogStore->syncLogLastIndex(pLogStore);
372
  ASSERT(pEntry->index == lastVer);
373 374 375
  return 0;
}

376
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm) {
377 378 379 380 381 382 383 384
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);

  SSyncLogStore* pLogStore = pNode->pLogStore;
  int64_t        matchIndex = pBuf->matchIndex;

  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
    int64_t index = pBuf->matchIndex + 1;
385
    ASSERT(index >= 0);
386 387 388 389 390 391 392

    // try to proceed
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
    if (pEntry == NULL) {
393
      sTrace("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64,
394 395 396 397
             pNode->vgId, pBuf->matchIndex);
      goto _out;
    }

398
    ASSERT(index == pEntry->index);
399 400 401

    // match
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
402 403 404 405
    ASSERT(pMatch != NULL);
    ASSERT(pMatch->index == pBuf->matchIndex);
    ASSERT(pMatch->index + 1 == pEntry->index);
    ASSERT(prevLogIndex == pMatch->index);
406 407 408

    if (pMatch->term != prevLogTerm) {
      sInfo(
409
          "vgId:%d, mismatching sync log entries encountered. "
410 411 412 413 414 415 416 417 418 419
          "{ index:%" PRId64 ", term:%" PRId64
          " } "
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
          pNode->vgId, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
      goto _out;
    }

    // increase match index
    pBuf->matchIndex = index;

420 421
    sTrace("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64,
           pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
422 423

    // replicate on demand
424
    (void)syncNodeReplicateWithoutLock(pNode);
425 426 427

    // persist
    if (syncLogStorePersist(pLogStore, pEntry) < 0) {
428 429
      sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
             pEntry->index);
430 431
      goto _out;
    }
432
    ASSERT(pEntry->index == pBuf->matchIndex);
433 434 435 436 437 438 439 440

    // update my match index
    matchIndex = pBuf->matchIndex;
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
  }  // end of while

_out:
  pBuf->matchIndex = matchIndex;
441 442 443
  if (pMatchTerm) {
    *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
  }
444 445 446 447 448
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return matchIndex;
}

449 450
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
                          int32_t applyCode) {
451 452 453 454
  if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) {
    return 0;
  }

455 456 457
  if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
    sTrace("vgId:%d, blocking msg ready to execute, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x",
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
458 459
  }

460
  SRpcMsg rpcMsg = {.code = applyCode};
461 462 463 464
  syncEntry2OriginalRpc(pEntry, &rpcMsg);

  SFsmCbMeta cbMeta = {0};
  cbMeta.index = pEntry->index;
465
  cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
466
  cbMeta.isWeak = pEntry->isWeak;
467
  cbMeta.code = applyCode;
468 469 470 471 472 473
  cbMeta.state = role;
  cbMeta.seqNum = pEntry->seqNum;
  cbMeta.term = pEntry->term;
  cbMeta.currentTerm = term;
  cbMeta.flag = -1;

474
  (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
475 476
  int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
  return code;
477 478 479
}

int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
480 481 482 483 484
  ASSERT(pBuf->startIndex <= pBuf->matchIndex);
  ASSERT(pBuf->commitIndex <= pBuf->matchIndex);
  ASSERT(pBuf->matchIndex < pBuf->endIndex);
  ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size);
  ASSERT(pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem);
485 486 487 488 489 490 491 492 493 494 495 496
  return 0;
}

int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);

  SSyncLogStore*  pLogStore = pNode->pLogStore;
  SSyncFSM*       pFsm = pNode->pFsm;
  ESyncState      role = pNode->state;
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SyncGroupId     vgId = pNode->vgId;
497
  int32_t         ret = -1;
498 499 500 501 502
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
  SSyncRaftEntry* pEntry = NULL;
  bool            inBuf = false;

  if (commitIndex <= pBuf->commitIndex) {
503
    sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex,
504 505 506 507 508
           commitIndex);
    ret = 0;
    goto _out;
  }

509 510
  sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role: %d, term: %" PRId64,
         pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, term);
511 512 513 514 515 516 517 518 519 520 521

  // execute in fsm
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
    // get a log entry
    pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
    if (pEntry == NULL) {
      goto _out;
    }

    // execute it
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
522
      sInfo("vgId:%d, commit sync barrier. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index,
523
            pEntry->term, TMSG_INFO(pEntry->originalRpcType));
524
    }
525

526
    if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry, 0) != 0) {
527
      sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
528 529
             ", role: %d, current term: %" PRId64,
             vgId, pEntry->index, pEntry->term, role, term);
530 531 532 533
      goto _out;
    }
    pBuf->commitIndex = index;

534
    sTrace("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId,
535 536 537 538 539 540 541 542 543
           pEntry->index, pEntry->term, role, term);

    if (!inBuf) {
      syncEntryDestroy(pEntry);
      pEntry = NULL;
    }
  }

  // recycle
544
  SyncIndex until = pBuf->commitIndex - (pBuf->size >> 4);
545 546
  for (SyncIndex index = pBuf->startIndex; index < until; index++) {
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
547
    ASSERT(pEntry != NULL);
548 549 550 551 552
    syncEntryDestroy(pEntry);
    memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
    pBuf->startIndex = index + 1;
  }

553
  ret = 0;
554 555 556 557 558
_out:
  // mark as restored if needed
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) {
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
    pNode->restoreFinish = true;
559
    sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
560 561 562 563 564 565 566 567 568 569 570 571
          pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
  }

  if (!inBuf) {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
  }
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

572 573 574
void syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
  if (pMgr == NULL) return;

575
  ASSERT(pMgr->startIndex >= 0);
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
    memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
  }
  pMgr->startIndex = 0;
  pMgr->matchIndex = 0;
  pMgr->endIndex = 0;
  pMgr->restored = false;
  pMgr->retryBackoff = 0;
}

int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
  if (pMgr->endIndex <= pMgr->startIndex) {
    return 0;
  }

591 592 593 594 595 596 597 598
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
  if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
    syncLogReplMgrReset(pMgr);
    sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer: %" PRIx64, pNode->vgId,
          pDestId->addr);
    return -1;
  }

599 600 601
  int32_t ret = -1;
  bool    retried = false;
  int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr);
602 603 604 605
  int64_t  nowMs = taosGetMonoTimestampMs();
  int      count = 0;
  int64_t  firstIndex = -1;
  SyncTerm term = -1;
606
  int64_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
607 608 609

  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
    int64_t pos = index % pMgr->size;
610
    ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex));
611

612 613 614 615
    if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
      break;
    }

616 617 618 619 620
    if (pMgr->states[pos].acked) {
      continue;
    }

    bool barrier = false;
621
    if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
622
      sError("vgId:%d, failed to replicate sync log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId,
623 624 625
             terrstr(), index, pDestId->addr);
      goto _out;
    }
626
    ASSERT(barrier == pMgr->states[pos].barrier);
627 628 629
    pMgr->states[pos].timeMs = nowMs;
    pMgr->states[pos].term = term;
    pMgr->states[pos].acked = false;
630

631
    retried = true;
632
    if (firstIndex == -1) firstIndex = index;
633 634 635 636

    if (batchSize <= count++) {
      break;
    }
637 638 639 640 641 642
  }

  ret = 0;
_out:
  if (retried) {
    pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr);
643
    sInfo("vgId:%d, resent %d sync log entries. dest: %" PRIx64 ", indexes: %" PRId64 " ..., terms: ... %" PRId64
644
          ", retryWaitMs: %" PRId64 ", repl mgr: [%" PRId64 " %" PRId64 ", %" PRId64 ")",
645 646
          pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
          pMgr->endIndex);
647 648 649 650 651 652 653 654
  }
  return ret;
}

int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode,
                                                 SyncAppendEntriesReply* pMsg) {
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  SRaftId         destId = pMsg->srcId;
655
  ASSERT(pMgr->restored == false);
656 657
  char     host[64];
  uint16_t port;
658
  syncUtilU642Addr(destId.addr, host, sizeof(host), &port);
659 660

  if (pMgr->endIndex == 0) {
661 662
    ASSERT(pMgr->startIndex == 0);
    ASSERT(pMgr->matchIndex == 0);
663 664
    if (pMsg->matchIndex < 0) {
      pMgr->restored = true;
665 666
      sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
667
            pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
668 669 670 671 672 673 674 675 676 677 678
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
      return 0;
    }
  } else {
    if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
      syncLogReplMgrRetryOnNeed(pMgr, pNode);
      return 0;
    }

    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;

679
    if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
680
      pMgr->matchIndex = pMsg->matchIndex;
681
      pMgr->restored = true;
682 683
      sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
684
            pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
685 686 687 688
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
      return 0;
    }

689
    if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) {
690 691
      sWarn("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64,
            pNode->vgId, host, port, pMsg->matchIndex, pMsg->lastSendIndex);
692 693 694 695
      if (syncNodeStartSnapshot(pNode, &destId) < 0) {
        sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port);
        return -1;
      }
696
      sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port);
697 698
      return 0;
    }
699 700
  }

701 702
  // check last match term
  SyncTerm  term = -1;
703
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
704 705 706 707 708
  SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);

  if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
    term = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index + 1);
    if (term < 0 || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
709
      ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
710 711
      if (syncNodeStartSnapshot(pNode, &destId) < 0) {
        sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port);
712
        return -1;
713
      }
714
      sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port);
715 716 717
      return 0;
    }

718
    ASSERT(index + 1 >= firstVer);
719 720 721

    if (term == pMsg->lastMatchTerm) {
      index = index + 1;
722
      ASSERT(index <= pNode->pLogBuf->matchIndex);
723
    } else {
724
      ASSERT(index > firstVer);
725 726 727
    }
  }

728
  // attempt to replicate the raft log at index
729 730
  (void)syncLogReplMgrReset(pMgr);
  return syncLogReplMgrReplicateProbeOnce(pMgr, pNode, index);
731 732 733 734 735 736
}

int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  taosThreadMutexLock(&pBuf->mutex);
  if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
737 738
    sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer: %" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
739
    syncLogReplMgrReset(pMgr);
740 741 742 743 744 745 746 747 748 749
    pMgr->peerStartTime = pMsg->startTime;
  }
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;
}

int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  taosThreadMutexLock(&pBuf->mutex);
  if (pMsg->startTime != pMgr->peerStartTime) {
750 751
    sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer: %" PRIx64 ", start time:%" PRId64
          ", old:%" PRId64,
752
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
753
    syncLogReplMgrReset(pMgr);
754 755 756 757 758 759 760 761 762 763 764 765
    pMgr->peerStartTime = pMsg->startTime;
  }

  if (pMgr->restored) {
    (void)syncLogReplMgrProcessReplyInNormalMode(pMgr, pNode, pMsg);
  } else {
    (void)syncLogReplMgrProcessReplyInRecoveryMode(pMgr, pNode, pMsg);
  }
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;
}

766
int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
767 768 769
  if (pMgr->restored) {
    (void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
  } else {
770
    (void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode, pNode->pLogBuf->matchIndex);
771 772 773 774
  }
  return 0;
}

775
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
776 777
  ASSERT(!pMgr->restored);
  ASSERT(pMgr->startIndex >= 0);
778 779 780 781 782 783 784 785 786
  int64_t retryMaxWaitMs = SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
  int64_t nowMs = taosGetMonoTimestampMs();

  if (pMgr->endIndex > pMgr->startIndex &&
      nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
    return 0;
  }
  (void)syncLogReplMgrReset(pMgr);

787 788 789 790 791 792 793 794 795
  SRaftId*  pDestId = &pNode->replicasId[pMgr->peerId];
  bool      barrier = false;
  SyncTerm  term = -1;
  if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
    sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
           terrstr(), index, pDestId->addr);
    return -1;
  }

796
  ASSERT(index >= 0);
797 798 799 800 801 802 803 804
  pMgr->states[index % pMgr->size].barrier = barrier;
  pMgr->states[index % pMgr->size].timeMs = nowMs;
  pMgr->states[index % pMgr->size].term = term;
  pMgr->states[index % pMgr->size].acked = false;

  pMgr->startIndex = index;
  pMgr->endIndex = index + 1;

805
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
806
  sTrace("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64
807 808 809 810 811 812 813 814
         ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
         ")",
         pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
  return 0;
}

int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
815
  ASSERT(pMgr->restored);
816

817
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
818
  int32_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
819 820
  int32_t  count = 0;
  int64_t  nowMs = taosGetMonoTimestampMs();
821
  int64_t  limit = pMgr->size >> 1;
822 823

  for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
824
    if (batchSize < count++ || limit <= index - pMgr->startIndex) {
825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
      break;
    }
    if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
      break;
    }
    int64_t  pos = index % pMgr->size;
    SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
    bool     barrier = false;
    SyncTerm term = -1;
    if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
      sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
             terrstr(), index, pDestId->addr);
      return -1;
    }
    pMgr->states[pos].barrier = barrier;
    pMgr->states[pos].timeMs = nowMs;
    pMgr->states[pos].term = term;
    pMgr->states[pos].acked = false;

    pMgr->endIndex = index + 1;
    if (barrier) {
846 847 848 849
      sInfo("vgId:%d, replicated sync barrier to dest: %" PRIx64 ". index: %" PRId64 ", term: %" PRId64
            ", repl mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 ")",
            pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex,
            pMgr->endIndex);
850 851 852 853
      break;
    }
  }

854 855
  syncLogReplMgrRetryOnNeed(pMgr, pNode);

856
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
857
  sTrace("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
858 859 860 861 862 863 864
         "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
         pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
  return 0;
}

int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
865
  ASSERT(pMgr->restored == true);
866
  if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
867 868 869 870 871 872 873 874
     if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
        int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
        int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
        int64_t timeDiffMs = lastSentMs - firstSentMs;
        if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
            pMgr->retryBackoff -= 1;
        }
    }
875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
    pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
    for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
      memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
    }
    pMgr->startIndex = pMgr->matchIndex;
  }

  return syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
}

SSyncLogReplMgr* syncLogReplMgrCreate() {
  SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
  if (pMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);

895
  ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE);
896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913

  return pMgr;

_err:
  taosMemoryFree(pMgr);
  return NULL;
}

void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) {
  if (pMgr == NULL) {
    return;
  }
  (void)taosMemoryFree(pMgr);
  return;
}

int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) {
  for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
914
    ASSERT(pNode->logReplMgrs[i] == NULL);
915 916
    pNode->logReplMgrs[i] = syncLogReplMgrCreate();
    pNode->logReplMgrs[i]->peerId = i;
S
Shengliang Guan 已提交
917
    ASSERTS(pNode->logReplMgrs[i] != NULL, "Out of memory.");
918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937
  }
  return 0;
}

void syncNodeLogReplMgrDestroy(SSyncNode* pNode) {
  for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
    syncLogReplMgrDestroy(pNode->logReplMgrs[i]);
    pNode->logReplMgrs[i] = NULL;
  }
}

SSyncLogBuffer* syncLogBufferCreate() {
  SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);

938
  ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE);
939

940 941 942 943 944 945 946 947 948 949 950 951 952
  if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
    sError("failed to init log buffer mutexattr due to %s", strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
    sError("failed to set log buffer mutexattr type due to %s", strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
953 954 955 956
    sError("failed to init log buffer mutex due to %s", strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
957

958 959 960 961 962 963 964
  return pBuf;

_err:
  taosMemoryFree(pBuf);
  return NULL;
}

965 966 967 968 969 970 971 972 973 974 975 976 977
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
  taosThreadMutexLock(&pBuf->mutex);
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
    if (pEntry == NULL) continue;
    syncEntryDestroy(pEntry);
    pEntry = NULL;
    memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
  }
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
  taosThreadMutexUnlock(&pBuf->mutex);
}

978 979 980 981
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
  if (pBuf == NULL) {
    return;
  }
982
  syncLogBufferClear(pBuf);
983
  (void)taosThreadMutexDestroy(&pBuf->mutex);
984
  (void)taosThreadMutexAttrDestroy(&pBuf->attr);
985 986 987 988
  (void)taosMemoryFree(pBuf);
  return;
}

989
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
990
  ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex);
991

992 993 994 995
  sInfo("vgId:%d, rollback sync log buffer. toindex: %" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64
        ", %" PRId64 ")",
        pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

996
  // trunc buffer
997 998 999 1000
  SyncIndex index = pBuf->endIndex - 1;
  while (index >= toIndex) {
    SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
    if (pEntry != NULL) {
1001
      (void)syncEntryDestroy(pEntry);
1002 1003 1004 1005 1006 1007 1008
      pEntry = NULL;
      memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
    }
    index--;
  }
  pBuf->endIndex = toIndex;
  pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
1009
  ASSERT(index + 1 == toIndex);
1010 1011 1012 1013 1014 1015 1016 1017

  // trunc wal
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer >= toIndex && pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex) < 0) {
    sError("vgId:%d, failed to truncate log store since %s. from index:%" PRId64 "", pNode->vgId, terrstr(), toIndex);
    return -1;
  }
  lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
1018
  ASSERT(toIndex == lastVer + 1);
1019

1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
  // refill buffer on need
  if (toIndex <= pBuf->startIndex) {
    int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
    if (ret < 0) {
      sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, terrstr());
      return -1;
    }
  }

  ASSERT(pBuf->endIndex == toIndex);
1030
  syncLogBufferValidate(pBuf);
1031 1032 1033 1034 1035 1036
  return 0;
}

int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
1037
  ASSERT(lastVer == pBuf->matchIndex);
1038 1039
  SyncIndex index = pBuf->endIndex - 1;

1040
  (void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1);
1041

1042
  sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
1043 1044 1045 1046 1047 1048 1049
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

  pBuf->endIndex = pBuf->matchIndex + 1;

  // reset repl mgr
  for (int i = 0; i < pNode->replicaNum; i++) {
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
1050
    syncLogReplMgrReset(pMgr);
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
  }
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;
}

SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf) {
  SSyncRaftEntry* pEntry = NULL;
  if (index >= pBuf->endIndex) {
    return NULL;
  }
  if (index > pBuf->startIndex) {  // startIndex might be dummy
    *pInBuf = true;
    pEntry = pBuf->entries[index % pBuf->size].pItem;
  } else {
    *pInBuf = false;
    if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, &pEntry) < 0) {
      sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
    }
  }
  return pEntry;
}

int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
                                    SRaftId* pDestId, bool* pBarrier) {
  SSyncRaftEntry* pEntry = NULL;
  SRpcMsg         msgOut = {0};
  bool            inBuf = false;
  int32_t         ret = -1;
  SyncTerm        prevLogTerm = -1;
  SSyncLogBuffer* pBuf = pNode->pLogBuf;

  pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
  if (pEntry == NULL) {
    sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index);
1085 1086 1087
    if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
      SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
      if (pMgr) {
1088
        sInfo("vgId:%d, reset sync log repl mgr of peer: %" PRIx64 " since %s. index: %" PRId64, pNode->vgId,
1089
              pDestId->addr, terrstr(), index);
1090 1091 1092
        (void)syncLogReplMgrReset(pMgr);
      }
    }
1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
    goto _err;
  }
  *pBarrier = syncLogIsReplicationBarrier(pEntry);

  prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index);
  if (prevLogTerm < 0) {
    sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index);
    goto _err;
  }
  if (pTerm) *pTerm = pEntry->term;

1104
  int32_t code = syncBuildAppendEntriesFromRaftLog(pNode, pEntry, prevLogTerm, &msgOut);
1105 1106 1107 1108 1109 1110 1111 1112
  if (code < 0) {
    sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
    goto _err;
  }

  (void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut);
  ret = 0;

1113 1114
  sTrace("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64,
         pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);
1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130

  if (!inBuf) {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
  }
  return 0;

_err:
  rpcFreeCont(msgOut.pCont);
  msgOut.pCont = NULL;
  if (!inBuf) {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
  }
  return -1;
}