syncPipeline.c 42.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE

18
#include "syncPipeline.h"
19
#include "syncCommit.h"
20
#include "syncIndexMgr.h"
21 22 23 24
#include "syncInt.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
25 26
#include "syncRespMgr.h"
#include "syncSnapshot.h"
27 28
#include "syncUtil.h"

29 30 31 32 33
static bool syncIsMsgBlock(tmsg_t type) {
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
}

34 35 36 37
FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() {
  return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
}

38 39 40 41 42 43 44 45 46 47 48 49 50
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
  taosThreadMutexLock(&pBuf->mutex);
  int64_t index = pBuf->endIndex;
  taosThreadMutexUnlock(&pBuf->mutex);
  return index;
}

int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);
  SyncIndex index = pEntry->index;

  if (index - pBuf->startIndex >= pBuf->size) {
51 52 53 54 55
    terrno = TSDB_CODE_SYN_BUFFER_FULL;
    sError("vgId:%d, failed to append since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
    goto _err;
  }

56 57 58 59 60 61 62
  if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
    terrno = TSDB_CODE_SYN_NEGO_WIN_EXCEEDED;
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, terrstr(),
           index, pBuf->commitIndex);
    goto _err;
  }

63
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
64
  if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) {
65 66 67
    terrno = TSDB_CODE_SYN_WRITE_STALL;
    sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
           pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex);
68
    goto _err;
69 70
  }

71
  ASSERT(index == pBuf->endIndex);
72 73

  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
74
  ASSERT(pExist == NULL);
75 76 77

  // initial log buffer with at least one item, e.g. commitIndex
  SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
S
Shengliang Guan 已提交
78
  ASSERTS(pMatch != NULL, "no matched log entry");
79
  ASSERT(pMatch->index + 1 == index);
80
  ASSERT(pMatch->term <= pEntry->term);
81 82 83 84 85 86 87 88 89

  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
  pBuf->entries[index % pBuf->size] = tmp;
  pBuf->endIndex = index + 1;

  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;

90
_err:
91 92
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
93
  taosMsleep(1);
94 95 96
  return -1;
}

97
SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
98 99 100 101 102 103
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  SSyncRaftEntry* pEntry = NULL;
  SyncIndex       prevIndex = index - 1;
  SyncTerm        prevLogTerm = -1;
  terrno = TSDB_CODE_SUCCESS;

104
  if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) return 0;
105

106
  if (prevIndex > pBuf->matchIndex) {
107 108 109 110
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

111
  ASSERT(index - 1 == prevIndex);
112

113 114
  if (prevIndex >= pBuf->startIndex) {
    pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
S
Shengliang Guan 已提交
115
    ASSERTS(pEntry != NULL, "no log entry found");
116
    prevLogTerm = pEntry->term;
117 118 119
    return prevLogTerm;
  }

120
  if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
121
    int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
S
Shengliang Guan 已提交
122
    ASSERTS(timeMs != 0, "no log entry found");
123
    prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
124
    ASSERT(prevIndex == 0 || prevLogTerm != 0);
125 126 127
    return prevLogTerm;
  }

128 129 130
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  if (prevIndex == snapshot.lastApplyIndex) {
131 132 133 134 135 136 137 138 139 140
    return snapshot.lastApplyTerm;
  }

  if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry) == 0) {
    prevLogTerm = pEntry->term;
    syncEntryDestroy(pEntry);
    pEntry = NULL;
    return prevLogTerm;
  }

141
  sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, terrstr(), prevIndex);
142 143 144 145 146 147 148 149
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
  return syncEntryBuildNoop(term, index, vgId);
}

150 151 152
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  if (firstVer > commitIndex + 1) {
S
Shengliang Guan 已提交
153 154
    sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer:%" PRId64
           ", tsdb commit version:%" PRId64 "",
155 156 157 158 159 160
           pNode->vgId, firstVer, commitIndex);
    return -1;
  }

  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex) {
S
Shengliang Guan 已提交
161 162
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer:%" PRId64
           ", tsdb commit version:%" PRId64 "",
163 164 165 166 167 168 169
           pNode->vgId, lastVer, commitIndex);
    return -1;
  }

  return 0;
}

170
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
S
Shengliang Guan 已提交
171 172 173
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
174

175 176 177
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

178
  SyncIndex commitIndex = snapshot.lastApplyIndex;
179
  SyncTerm  commitTerm = TMAX(snapshot.lastApplyTerm, 0);
180
  if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) {
181 182 183 184
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
    goto _err;
  }

185
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
186
  ASSERT(lastVer >= commitIndex);
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
  SyncIndex toIndex = lastVer;
  // update match index
  pBuf->commitIndex = commitIndex;
  pBuf->matchIndex = toIndex;
  pBuf->endIndex = toIndex + 1;

  // load log entries in reverse order
  SSyncLogStore*  pLogStore = pNode->pLogStore;
  SyncIndex       index = toIndex;
  SSyncRaftEntry* pEntry = NULL;
  bool            takeDummy = false;

  while (true) {
    if (index <= pBuf->commitIndex) {
      takeDummy = true;
      break;
    }

    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
      sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
      break;
    }

    bool taken = false;
211 212
    int  emptySize = 5;
    if (toIndex - index + 1 <= pBuf->size - emptySize) {
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
      pBuf->entries[index % pBuf->size] = tmp;
      taken = true;
    }

    if (index < toIndex) {
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
    }

    if (!taken) {
      syncEntryDestroy(pEntry);
      pEntry = NULL;
      break;
    }

    index--;
  }

  // put a dummy record at commitIndex if present in log buffer
  if (takeDummy) {
234
    ASSERT(index == pBuf->commitIndex);
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252

    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
    if (pDummy == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;

    if (index < toIndex) {
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
    }
  }

  // update startIndex
  pBuf->startIndex = takeDummy ? index : index + 1;

C
cadem 已提交
253 254
  pBuf->isCatchup = false;

255 256 257
  sInfo("vgId:%d, init sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

258 259 260 261 262 263 264 265
  // validate
  syncLogBufferValidate(pBuf);
  return 0;

_err:
  return -1;
}

266 267 268 269 270 271 272 273 274
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
275
  syncLogBufferValidate(pBuf);
276 277 278 279 280 281 282 283 284 285 286 287
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
    if (pEntry == NULL) continue;
    syncEntryDestroy(pEntry);
    pEntry = NULL;
    memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
  }
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
  if (ret < 0) {
    sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr());
  }
288
  syncLogBufferValidate(pBuf);
289 290 291 292
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

293
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
294 295
  SyncIndex       index = pBuf->matchIndex;
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
296
  ASSERT(pEntry != NULL);
297 298 299
  return pEntry->term;
}

300 301 302 303 304 305 306
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
  taosThreadMutexLock(&pBuf->mutex);
  SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return term;
}

307 308 309 310 311 312 313
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
  taosThreadMutexLock(&pBuf->mutex);
  bool empty = (pBuf->endIndex <= pBuf->startIndex);
  taosThreadMutexUnlock(&pBuf->mutex);
  return empty;
}

314 315 316
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);
S
Shengliang Guan 已提交
317 318 319
  int32_t         ret = -1;
  SyncIndex       index = pEntry->index;
  SyncIndex       prevIndex = pEntry->index - 1;
320
  SyncTerm        lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
321 322
  SSyncRaftEntry* pExist = NULL;
  bool            inBuf = true;
323 324

  if (index <= pBuf->commitIndex) {
S
Shengliang Guan 已提交
325
    sTrace("vgId:%d, already committed. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64
326
           " %" PRId64 ", %" PRId64 ")",
327 328
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
           pBuf->endIndex);
329
    SyncTerm term = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1);
330
    ASSERT(pEntry->term >= 0);
331 332 333
    if (term == pEntry->term) {
      ret = 0;
    }
334 335 336
    goto _out;
  }

C
cadem 已提交
337 338 339 340 341 342 343 344 345
  if(pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER &&
      index > 0 && index > pBuf->totalIndex){
    pBuf->totalIndex = index;
    sTrace("vgId:%d, update learner progress. index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
          " != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
          pBuf->matchIndex, pBuf->endIndex);
  }

346
  if (index - pBuf->startIndex >= pBuf->size) {
S
Shengliang Guan 已提交
347
    sWarn("vgId:%d, out of buffer range. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64
348
          " %" PRId64 ", %" PRId64 ")",
349 350 351 352 353 354
          pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
          pBuf->endIndex);
    goto _out;
  }

  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
S
Shengliang Guan 已提交
355 356
    sWarn("vgId:%d, not ready to accept. index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
          " != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
357 358 359 360 361 362
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
          pBuf->matchIndex, pBuf->endIndex);
    goto _out;
  }

  // check current in buffer
363
  pExist = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
364
  if (pExist != NULL) {
365
    ASSERT(pEntry->index == pExist->index);
366
    if (pEntry->term != pExist->term) {
367
      (void)syncLogBufferRollback(pBuf, pNode, index);
368
    } else {
S
Shengliang Guan 已提交
369
      sTrace("vgId:%d, duplicate log entry received. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64
370 371 372
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
             pBuf->endIndex);
373
      SyncTerm existPrevTerm = syncLogReplGetPrevLogTerm(NULL, pNode, index);
374
      ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm));
375 376 377 378 379 380
      ret = 0;
      goto _out;
    }
  }

  // update
B
Benguang Zhao 已提交
381 382
  ASSERT(pBuf->startIndex < index);
  ASSERT(index - pBuf->startIndex < pBuf->size);
383
  ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL);
384 385 386 387 388 389 390 391 392 393 394 395
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
  pEntry = NULL;
  pBuf->entries[index % pBuf->size] = tmp;

  // update end index
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);

  // success
  ret = 0;

_out:
  syncEntryDestroy(pEntry);
396 397 398 399
  if (!inBuf) {
    syncEntryDestroy(pExist);
    pExist = NULL;
  }
400 401 402 403 404
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

405 406 407 408 409
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
  return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
}

int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
410
  ASSERT(pEntry->index >= 0);
411 412 413 414 415 416
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
  if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) {
    sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index);
    return -1;
  }
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
417
  ASSERT(pEntry->index == lastVer + 1);
418

419 420
  bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
  if (pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync) < 0) {
421
    sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index,
422 423 424 425 426
           pEntry->term);
    return -1;
  }

  lastVer = pLogStore->syncLogLastIndex(pLogStore);
427
  ASSERT(pEntry->index == lastVer);
428 429 430
  return 0;
}

431
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm) {
432 433 434 435 436 437 438 439
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);

  SSyncLogStore* pLogStore = pNode->pLogStore;
  int64_t        matchIndex = pBuf->matchIndex;

  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
    int64_t index = pBuf->matchIndex + 1;
440
    ASSERT(index >= 0);
441 442 443 444 445 446 447

    // try to proceed
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
    if (pEntry == NULL) {
448
      sTrace("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64,
449 450 451 452
             pNode->vgId, pBuf->matchIndex);
      goto _out;
    }

453
    ASSERT(index == pEntry->index);
454 455 456

    // match
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
457 458 459 460
    ASSERT(pMatch != NULL);
    ASSERT(pMatch->index == pBuf->matchIndex);
    ASSERT(pMatch->index + 1 == pEntry->index);
    ASSERT(prevLogIndex == pMatch->index);
461 462 463

    if (pMatch->term != prevLogTerm) {
      sInfo(
464
          "vgId:%d, mismatching sync log entries encountered. "
465 466 467 468 469 470 471 472 473 474
          "{ index:%" PRId64 ", term:%" PRId64
          " } "
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
          pNode->vgId, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
      goto _out;
    }

    // increase match index
    pBuf->matchIndex = index;

S
Shengliang Guan 已提交
475
    sTrace("vgId:%d, log buffer proceed. start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64,
476
           pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
477 478

    // replicate on demand
479
    (void)syncNodeReplicateWithoutLock(pNode);
480 481

    // persist
482
    if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) {
483 484
      sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
             pEntry->index);
485 486
      goto _out;
    }
487
    ASSERT(pEntry->index == pBuf->matchIndex);
488 489 490 491 492 493 494 495

    // update my match index
    matchIndex = pBuf->matchIndex;
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
  }  // end of while

_out:
  pBuf->matchIndex = matchIndex;
496 497 498
  if (pMatchTerm) {
    *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
  }
499 500 501 502 503
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return matchIndex;
}

504
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
C
cadem 已提交
505
                          int32_t applyCode) {
506
  if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) {
507 508 509
    return 0;
  }

510 511 512
  if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
    sTrace("vgId:%d, blocking msg ready to execute, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x",
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
513 514
  }

B
Benguang Zhao 已提交
515
  if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
S
Shengliang Guan 已提交
516
    sInfo("vgId:%d, fsm execute vnode commit. index:%" PRId64 ", term:%" PRId64 "", pNode->vgId, pEntry->index,
B
Benguang Zhao 已提交
517 518 519
          pEntry->term);
  }

520
  SRpcMsg rpcMsg = {.code = applyCode};
521 522 523 524
  syncEntry2OriginalRpc(pEntry, &rpcMsg);

  SFsmCbMeta cbMeta = {0};
  cbMeta.index = pEntry->index;
525
  cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
526
  cbMeta.isWeak = pEntry->isWeak;
527
  cbMeta.code = applyCode;
528 529 530 531 532 533
  cbMeta.state = role;
  cbMeta.seqNum = pEntry->seqNum;
  cbMeta.term = pEntry->term;
  cbMeta.currentTerm = term;
  cbMeta.flag = -1;

534
  (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
535 536
  int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
  return code;
537 538 539
}

int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
540 541 542 543 544
  ASSERT(pBuf->startIndex <= pBuf->matchIndex);
  ASSERT(pBuf->commitIndex <= pBuf->matchIndex);
  ASSERT(pBuf->matchIndex < pBuf->endIndex);
  ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size);
  ASSERT(pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem);
545 546 547 548 549 550 551 552 553 554
  return 0;
}

int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) {
  taosThreadMutexLock(&pBuf->mutex);
  syncLogBufferValidate(pBuf);

  SSyncLogStore*  pLogStore = pNode->pLogStore;
  SSyncFSM*       pFsm = pNode->pFsm;
  ESyncState      role = pNode->state;
555
  SyncTerm        currentTerm = raftStoreGetTerm(pNode);
556
  SyncGroupId     vgId = pNode->vgId;
557
  int32_t         ret = -1;
558 559 560 561 562
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
  SSyncRaftEntry* pEntry = NULL;
  bool            inBuf = false;

  if (commitIndex <= pBuf->commitIndex) {
563
    sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex,
564 565 566 567 568
           commitIndex);
    ret = 0;
    goto _out;
  }

S
Shengliang Guan 已提交
569
  sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role:%d, term:%" PRId64,
570
         pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm);
571 572 573 574 575 576 577 578 579 580 581

  // execute in fsm
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
    // get a log entry
    pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
    if (pEntry == NULL) {
      goto _out;
    }

    // execute it
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
S
Shengliang Guan 已提交
582
      sInfo("vgId:%d, commit sync barrier. index:%" PRId64 ", term:%" PRId64 ", type:%s", vgId, pEntry->index,
583
            pEntry->term, TMSG_INFO(pEntry->originalRpcType));
584
    }
585

586
    if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) {
587
      sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
S
Shengliang Guan 已提交
588
             ", role:%d, current term:%" PRId64,
589
             vgId, pEntry->index, pEntry->term, role, currentTerm);
590 591 592 593
      goto _out;
    }
    pBuf->commitIndex = index;

S
Shengliang Guan 已提交
594
    sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId,
595
           pEntry->index, pEntry->term, role, currentTerm);
596 597 598 599 600 601 602 603

    if (!inBuf) {
      syncEntryDestroy(pEntry);
      pEntry = NULL;
    }
  }

  // recycle
604
  SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
605 606
  for (SyncIndex index = pBuf->startIndex; index < until; index++) {
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
607
    ASSERT(pEntry != NULL);
608 609 610 611 612
    syncEntryDestroy(pEntry);
    memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
    pBuf->startIndex = index + 1;
  }

613
  ret = 0;
614 615
_out:
  // mark as restored if needed
616
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
617
      currentTerm <= pEntry->term) {
618
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
619
    pNode->restoreFinish = true;
620 621
    sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
          pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
622 623 624 625 626 627 628 629 630 631 632
  }

  if (!inBuf) {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
  }
  syncLogBufferValidate(pBuf);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

633
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
634 635
  if (pMgr == NULL) return;

636
  ASSERT(pMgr->startIndex >= 0);
637 638 639 640 641 642 643 644 645 646
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
    memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
  }
  pMgr->startIndex = 0;
  pMgr->matchIndex = 0;
  pMgr->endIndex = 0;
  pMgr->restored = false;
  pMgr->retryBackoff = 0;
}

647
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
648 649 650 651
  if (pMgr->endIndex <= pMgr->startIndex) {
    return 0;
  }

652 653
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
  if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
654
    syncLogReplReset(pMgr);
655
    sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId,
656 657 658 659
          pDestId->addr);
    return -1;
  }

S
Shengliang Guan 已提交
660 661
  int32_t  ret = -1;
  bool     retried = false;
662
  int64_t  retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr);
663 664 665 666
  int64_t  nowMs = taosGetMonoTimestampMs();
  int      count = 0;
  int64_t  firstIndex = -1;
  SyncTerm term = -1;
667
  int64_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
668 669 670

  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
    int64_t pos = index % pMgr->size;
671
    ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex));
672

673 674 675 676
    if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
      break;
    }

677
    if (pMgr->states[pos].acked) {
678
      if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
679
        syncLogReplReset(pMgr);
680 681
        sWarn("vgId:%d, reset sync log repl since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, index,
              pDestId->addr);
682 683
        goto _out;
      }
684 685 686 687
      continue;
    }

    bool barrier = false;
688
    if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
S
Shengliang Guan 已提交
689
      sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId,
690 691 692
             terrstr(), index, pDestId->addr);
      goto _out;
    }
693
    ASSERT(barrier == pMgr->states[pos].barrier);
694 695 696
    pMgr->states[pos].timeMs = nowMs;
    pMgr->states[pos].term = term;
    pMgr->states[pos].acked = false;
697

698
    retried = true;
699
    if (firstIndex == -1) firstIndex = index;
700 701 702 703

    if (batchSize <= count++) {
      break;
    }
704 705 706 707 708
  }

  ret = 0;
_out:
  if (retried) {
709
    pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
710
    SSyncLogBuffer* pBuf = pNode->pLogBuf;
S
Shengliang Guan 已提交
711 712
    sInfo("vgId:%d, resend %d sync log entries. dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: ... %" PRId64
          ", retryWaitMs:%" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
713
          " %" PRId64 ", %" PRId64 ")",
714
          pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
715
          pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
716 717 718 719
  }
  return ret;
}

720
int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
721 722
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  SRaftId         destId = pMsg->srcId;
723
  ASSERT(pMgr->restored == false);
724 725

  if (pMgr->endIndex == 0) {
726 727
    ASSERT(pMgr->startIndex == 0);
    ASSERT(pMgr->matchIndex == 0);
728 729
    if (pMsg->matchIndex < 0) {
      pMgr->restored = true;
730
      sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
731
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
732
            pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
733 734 735 736 737
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
      return 0;
    }
  } else {
    if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
738
      syncLogReplRetryOnNeed(pMgr, pNode);
739 740 741 742 743
      return 0;
    }

    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;

744
    if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
745
      pMgr->matchIndex = pMsg->matchIndex;
746
      pMgr->restored = true;
747
      sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
748
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
749
            pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
750 751 752 753
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
      return 0;
    }

754
    if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) {
S
Shengliang Guan 已提交
755
      sWarn("vgId:%d, failed to rollback match index. peer: dnode:%d, match index:%" PRId64 ", last sent:%" PRId64,
756
            pNode->vgId, DID(&destId), pMsg->matchIndex, pMsg->lastSendIndex);
757
      if (syncNodeStartSnapshot(pNode, &destId) < 0) {
758
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
759 760
        return -1;
      }
761
      sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId));
762 763
      return 0;
    }
764 765
  }

766 767
  // check last match term
  SyncTerm  term = -1;
768
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
769 770 771
  SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);

  if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
772
    term = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1);
773 774
    if ((index + 1 < firstVer) || (term < 0) ||
        (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
775
      ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
776
      if (syncNodeStartSnapshot(pNode, &destId) < 0) {
S
Shengliang Guan 已提交
777
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
778
        return -1;
779
      }
S
Shengliang Guan 已提交
780
      sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId));
781 782 783
      return 0;
    }

784
    ASSERT(index + 1 >= firstVer);
785 786 787

    if (term == pMsg->lastMatchTerm) {
      index = index + 1;
788
      ASSERT(index <= pNode->pLogBuf->matchIndex);
789
    } else {
790
      ASSERT(index > firstVer);
791 792 793
    }
  }

794
  // attempt to replicate the raft log at index
795
  (void)syncLogReplReset(pMgr);
796
  return syncLogReplProbe(pMgr, pNode, index);
797 798
}

799
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
800 801 802
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  taosThreadMutexLock(&pBuf->mutex);
  if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
803
    sInfo("vgId:%d, reset sync log repl in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
804
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
805
    syncLogReplReset(pMgr);
806 807 808 809 810 811
    pMgr->peerStartTime = pMsg->startTime;
  }
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;
}

812
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
813 814 815
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  taosThreadMutexLock(&pBuf->mutex);
  if (pMsg->startTime != pMgr->peerStartTime) {
816
    sInfo("vgId:%d, reset sync log repl in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
817
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
818
    syncLogReplReset(pMgr);
819 820 821 822
    pMgr->peerStartTime = pMsg->startTime;
  }

  if (pMgr->restored) {
823
    (void)syncLogReplProcessReplyAsNormal(pMgr, pNode, pMsg);
824
  } else {
825
    (void)syncLogReplProcessReplyAsRecovery(pMgr, pNode, pMsg);
826 827 828 829 830
  }
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;
}

831
int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
832
  if (pMgr->restored) {
833
    (void)syncLogReplAttempt(pMgr, pNode);
834
  } else {
835
    (void)syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex);
836 837 838 839
  }
  return 0;
}

840
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
841 842
  ASSERT(!pMgr->restored);
  ASSERT(pMgr->startIndex >= 0);
843
  int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
844 845 846 847 848 849
  int64_t nowMs = taosGetMonoTimestampMs();

  if (pMgr->endIndex > pMgr->startIndex &&
      nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
    return 0;
  }
850
  (void)syncLogReplReset(pMgr);
851

S
Shengliang Guan 已提交
852 853 854
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
  bool     barrier = false;
  SyncTerm term = -1;
855
  if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
S
Shengliang Guan 已提交
856
    sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
857 858 859 860
           terrstr(), index, pDestId->addr);
    return -1;
  }

861
  ASSERT(index >= 0);
862 863 864 865 866 867 868 869
  pMgr->states[index % pMgr->size].barrier = barrier;
  pMgr->states[index % pMgr->size].timeMs = nowMs;
  pMgr->states[index % pMgr->size].term = term;
  pMgr->states[index % pMgr->size].acked = false;

  pMgr->startIndex = index;
  pMgr->endIndex = index + 1;

870
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
S
Shengliang Guan 已提交
871
  sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ". mgr (rs:%d): [%" PRId64
872 873 874
        " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
        pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
875 876 877
  return 0;
}

878
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
879
  ASSERT(pMgr->restored);
880

S
Shengliang Guan 已提交
881 882 883 884 885
  SRaftId*  pDestId = &pNode->replicasId[pMgr->peerId];
  int32_t   batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
  int32_t   count = 0;
  int64_t   nowMs = taosGetMonoTimestampMs();
  int64_t   limit = pMgr->size >> 1;
886 887
  SyncTerm  term = -1;
  SyncIndex firstIndex = -1;
888 889

  for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
890
    if (batchSize < count || limit <= index - pMgr->startIndex) {
891 892 893 894 895 896 897 898 899
      break;
    }
    if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
      break;
    }
    int64_t  pos = index % pMgr->size;
    SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
    bool     barrier = false;
    SyncTerm term = -1;
900
    if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
S
Shengliang Guan 已提交
901
      sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
902 903 904 905 906 907 908 909
             terrstr(), index, pDestId->addr);
      return -1;
    }
    pMgr->states[pos].barrier = barrier;
    pMgr->states[pos].timeMs = nowMs;
    pMgr->states[pos].term = term;
    pMgr->states[pos].acked = false;

910 911 912
    if (firstIndex == -1) firstIndex = index;
    count++;

913 914
    pMgr->endIndex = index + 1;
    if (barrier) {
S
Shengliang Guan 已提交
915
      sInfo("vgId:%d, replicated sync barrier to dest:%" PRIx64 ". index:%" PRId64 ", term:%" PRId64
916 917 918
            ", repl mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 ")",
            pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex,
            pMgr->endIndex);
919 920 921 922
      break;
    }
  }

923
  syncLogReplRetryOnNeed(pMgr, pNode);
924

925
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
S
Shengliang Guan 已提交
926
  sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64
927 928 929 930
         ", mgr: (rs:%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
         ")",
         pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex,
         pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
931 932 933
  return 0;
}

934
int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
935
  ASSERT(pMgr->restored == true);
936
  if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
S
Shengliang Guan 已提交
937
    if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
938 939 940 941
      int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
      int64_t lastMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
      int64_t diffMs = lastMs - firstMs;
      if (diffMs > 0 && diffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
S
Shengliang Guan 已提交
942 943
        pMgr->retryBackoff -= 1;
      }
944
    }
945 946 947 948 949 950 951 952
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
    pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
    for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
      memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
    }
    pMgr->startIndex = pMgr->matchIndex;
  }

953
  return syncLogReplAttempt(pMgr, pNode);
954 955
}

956
SSyncLogReplMgr* syncLogReplCreate() {
957 958 959 960 961 962 963 964
  SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
  if (pMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);

965
  ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE);
966 967 968 969

  return pMgr;
}

970
void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
971 972 973 974 975 976 977
  if (pMgr == NULL) {
    return;
  }
  (void)taosMemoryFree(pMgr);
  return;
}

978
int32_t syncNodeLogReplInit(SSyncNode* pNode) {
C
cadem 已提交
979
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
980
    ASSERT(pNode->logReplMgrs[i] == NULL);
981
    pNode->logReplMgrs[i] = syncLogReplCreate();
982 983 984 985
    if (pNode->logReplMgrs[i] == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
986 987 988 989 990
    pNode->logReplMgrs[i]->peerId = i;
  }
  return 0;
}

991
void syncNodeLogReplDestroy(SSyncNode* pNode) {
C
cadem 已提交
992
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
993
    syncLogReplDestroy(pNode->logReplMgrs[i]);
994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006
    pNode->logReplMgrs[i] = NULL;
  }
}

SSyncLogBuffer* syncLogBufferCreate() {
  SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);

1007
  ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE);
1008

1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
  if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
    sError("failed to init log buffer mutexattr due to %s", strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
    sError("failed to set log buffer mutexattr type due to %s", strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
1022 1023 1024 1025
    sError("failed to init log buffer mutex due to %s", strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
1026

1027 1028 1029 1030 1031 1032 1033
  return pBuf;

_err:
  taosMemoryFree(pBuf);
  return NULL;
}

1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
  taosThreadMutexLock(&pBuf->mutex);
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
    if (pEntry == NULL) continue;
    syncEntryDestroy(pEntry);
    pEntry = NULL;
    memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
  }
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
  taosThreadMutexUnlock(&pBuf->mutex);
}

1047 1048 1049 1050
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
  if (pBuf == NULL) {
    return;
  }
1051
  syncLogBufferClear(pBuf);
1052
  (void)taosThreadMutexDestroy(&pBuf->mutex);
1053
  (void)taosThreadMutexAttrDestroy(&pBuf->attr);
1054 1055 1056 1057
  (void)taosMemoryFree(pBuf);
  return;
}

1058
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
1059
  ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex);
1060

1061 1062 1063 1064
  if (toIndex == pBuf->endIndex) {
    return 0;
  }

S
Shengliang Guan 已提交
1065
  sInfo("vgId:%d, rollback sync log buffer. toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64
1066 1067 1068
        ", %" PRId64 ")",
        pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

1069
  // trunc buffer
1070 1071 1072 1073
  SyncIndex index = pBuf->endIndex - 1;
  while (index >= toIndex) {
    SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
    if (pEntry != NULL) {
1074
      (void)syncEntryDestroy(pEntry);
1075 1076 1077 1078 1079 1080 1081
      pEntry = NULL;
      memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
    }
    index--;
  }
  pBuf->endIndex = toIndex;
  pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
1082
  ASSERT(index + 1 == toIndex);
1083 1084 1085 1086 1087 1088 1089 1090

  // trunc wal
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer >= toIndex && pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex) < 0) {
    sError("vgId:%d, failed to truncate log store since %s. from index:%" PRId64 "", pNode->vgId, terrstr(), toIndex);
    return -1;
  }
  lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
1091
  ASSERT(toIndex == lastVer + 1);
1092

1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
  // refill buffer on need
  if (toIndex <= pBuf->startIndex) {
    int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
    if (ret < 0) {
      sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, terrstr());
      return -1;
    }
  }

  ASSERT(pBuf->endIndex == toIndex);
1103
  syncLogBufferValidate(pBuf);
1104 1105 1106 1107 1108
  return 0;
}

int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
  taosThreadMutexLock(&pBuf->mutex);
1109
  syncLogBufferValidate(pBuf);
1110
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
1111
  ASSERT(lastVer == pBuf->matchIndex);
1112 1113
  SyncIndex index = pBuf->endIndex - 1;

1114
  (void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1);
1115

1116
  sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
1117 1118 1119 1120 1121
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);

  pBuf->endIndex = pBuf->matchIndex + 1;

  // reset repl mgr
C
cadem 已提交
1122
  for (int i = 0; i < pNode->totalReplicaNum; i++) {
1123
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
1124
    syncLogReplReset(pMgr);
1125
  }
1126
  syncLogBufferValidate(pBuf);
1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;
}

SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf) {
  SSyncRaftEntry* pEntry = NULL;
  if (index >= pBuf->endIndex) {
    return NULL;
  }
  if (index > pBuf->startIndex) {  // startIndex might be dummy
    *pInBuf = true;
    pEntry = pBuf->entries[index % pBuf->size].pItem;
  } else {
    *pInBuf = false;
    if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, &pEntry) < 0) {
      sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
    }
  }
  return pEntry;
}

1148 1149
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
                          bool* pBarrier) {
1150 1151 1152 1153 1154 1155 1156 1157
  SSyncRaftEntry* pEntry = NULL;
  SRpcMsg         msgOut = {0};
  bool            inBuf = false;
  SyncTerm        prevLogTerm = -1;
  SSyncLogBuffer* pBuf = pNode->pLogBuf;

  pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
  if (pEntry == NULL) {
S
Shengliang Guan 已提交
1158
    sError("vgId:%d, failed to get raft entry for index:%" PRId64 "", pNode->vgId, index);
1159 1160 1161
    if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
      SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
      if (pMgr) {
1162 1163
        sInfo("vgId:%d, reset sync log repl of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr,
              terrstr(), index);
1164
        (void)syncLogReplReset(pMgr);
1165 1166
      }
    }
1167 1168
    goto _err;
  }
1169
  *pBarrier = syncLogReplBarrier(pEntry);
1170

1171
  prevLogTerm = syncLogReplGetPrevLogTerm(pMgr, pNode, index);
1172
  if (prevLogTerm < 0) {
S
Shengliang Guan 已提交
1173
    sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
1174 1175 1176 1177
    goto _err;
  }
  if (pTerm) *pTerm = pEntry->term;

1178
  int32_t code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
1179 1180 1181 1182 1183 1184 1185
  if (code < 0) {
    sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
    goto _err;
  }

  (void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut);

S
Shengliang Guan 已提交
1186
  sTrace("vgId:%d, replicate one msg index:%" PRId64 " term:%" PRId64 " prevterm:%" PRId64 " to dest: 0x%016" PRIx64,
1187
         pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);
1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203

  if (!inBuf) {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
  }
  return 0;

_err:
  rpcFreeCont(msgOut.pCont);
  msgOut.pCont = NULL;
  if (!inBuf) {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
  }
  return -1;
}