syncReplication.c 19.9 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

M
Minghao Li 已提交
16
#include "syncReplication.h"
M
Minghao Li 已提交
17
#include "syncIndexMgr.h"
M
Minghao Li 已提交
18
#include "syncMessage.h"
M
Minghao Li 已提交
19
#include "syncRaftCfg.h"
M
Minghao Li 已提交
20
#include "syncRaftEntry.h"
M
Minghao Li 已提交
21
#include "syncRaftLog.h"
M
Minghao Li 已提交
22
#include "syncRaftStore.h"
23
#include "syncSnapshot.h"
M
Minghao Li 已提交
24
#include "syncUtil.h"
M
Minghao Li 已提交
25

M
Minghao Li 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
// TLA+ Spec
// AppendEntries(i, j) ==
//    /\ i /= j
//    /\ state[i] = Leader
//    /\ LET prevLogIndex == nextIndex[i][j] - 1
//           prevLogTerm == IF prevLogIndex > 0 THEN
//                              log[i][prevLogIndex].term
//                          ELSE
//                              0
//           \* Send up to 1 entry, constrained by the end of the log.
//           lastEntry == Min({Len(log[i]), nextIndex[i][j]})
//           entries == SubSeq(log[i], nextIndex[i][j], lastEntry)
//       IN Send([mtype          |-> AppendEntriesRequest,
//                mterm          |-> currentTerm[i],
//                mprevLogIndex  |-> prevLogIndex,
//                mprevLogTerm   |-> prevLogTerm,
//                mentries       |-> entries,
//                \* mlog is used as a history variable for the proof.
//                \* It would not exist in a real implementation.
//                mlog           |-> log[i],
//                mcommitIndex   |-> Min({commitIndex[i], lastEntry}),
//                msource        |-> i,
//                mdest          |-> j])
//    /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
M
Minghao Li 已提交
50
//
M
Minghao Li 已提交
51
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
52
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
M
Minghao Li 已提交
53

M
Minghao Li 已提交
54 55 56 57
  syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pNextIndex", pSyncNode->pNextIndex);
  syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pMatchIndex", pSyncNode->pMatchIndex);
  logStoreSimpleLog2("==syncNodeAppendEntriesPeers==", pSyncNode->pLogStore);

M
Minghao Li 已提交
58
  int32_t ret = 0;
M
Minghao Li 已提交
59
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
60
    SRaftId* pDestId = &(pSyncNode->peersId[i]);
M
Minghao Li 已提交
61

M
Minghao Li 已提交
62 63
    // set prevLogIndex
    SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
M
Minghao Li 已提交
64

M
Minghao Li 已提交
65
    SyncIndex preLogIndex = nextIndex - 1;
M
Minghao Li 已提交
66

M
Minghao Li 已提交
67
    // set preLogTerm
M
Minghao Li 已提交
68 69
    SyncTerm preLogTerm = 0;
    if (preLogIndex >= SYNC_INDEX_BEGIN) {
M
Minghao Li 已提交
70
      SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex);
M
Minghao Li 已提交
71
      ASSERT(pPreEntry != NULL);
M
Minghao Li 已提交
72

M
Minghao Li 已提交
73
      preLogTerm = pPreEntry->term;
M
Minghao Li 已提交
74
      syncEntryDestory(pPreEntry);
M
Minghao Li 已提交
75
    }
M
Minghao Li 已提交
76

M
Minghao Li 已提交
77 78 79 80
    // batch optimized
    // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);

    SyncAppendEntries* pMsg = NULL;
81
    SSyncRaftEntry*    pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex);
M
Minghao Li 已提交
82
    if (pEntry != NULL) {
M
Minghao Li 已提交
83
      pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId);
M
Minghao Li 已提交
84
      ASSERT(pMsg != NULL);
M
Minghao Li 已提交
85

M
Minghao Li 已提交
86 87 88
      // add pEntry into msg
      uint32_t len;
      char*    serialized = syncEntrySerialize(pEntry, &len);
M
Minghao Li 已提交
89
      ASSERT(len == pEntry->bytes);
M
Minghao Li 已提交
90 91
      memcpy(pMsg->data, serialized, len);

wafwerar's avatar
wafwerar 已提交
92
      taosMemoryFree(serialized);
M
Minghao Li 已提交
93 94 95 96
      syncEntryDestory(pEntry);

    } else {
      // maybe overflow, send empty record
M
Minghao Li 已提交
97
      pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
M
Minghao Li 已提交
98
      ASSERT(pMsg != NULL);
M
Minghao Li 已提交
99
    }
M
Minghao Li 已提交
100

M
Minghao Li 已提交
101
    ASSERT(pMsg != NULL);
M
Minghao Li 已提交
102 103
    pMsg->srcId = pSyncNode->myRaftId;
    pMsg->destId = *pDestId;
M
Minghao Li 已提交
104
    pMsg->term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
105 106 107 108
    pMsg->prevLogIndex = preLogIndex;
    pMsg->prevLogTerm = preLogTerm;
    pMsg->commitIndex = pSyncNode->commitIndex;

M
Minghao Li 已提交
109 110
    syncAppendEntriesLog2("==syncNodeAppendEntriesPeers==", pMsg);

M
Minghao Li 已提交
111
    // send AppendEntries
M
Minghao Li 已提交
112
    syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
M
Minghao Li 已提交
113
    syncAppendEntriesDestroy(pMsg);
M
Minghao Li 已提交
114 115
  }

M
Minghao Li 已提交
116 117 118
  return ret;
}

119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
int32_t syncNodeAppendEntriesOnePeer(SSyncNode* pSyncNode, SRaftId* pDestId, SyncIndex nextIndex) {
  int32_t ret = 0;

  // pre index, pre term
  SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
  SyncTerm  preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
  if (preLogTerm == SYNC_TERM_INVALID) {
    SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
    // SyncIndex newNextIndex = nextIndex + 1;

    syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
    syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
    sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
           ", match-index:%d, raftid:%" PRId64,
           pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
    return -1;
  }

  // entry pointer array
  SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE];
  memset(entryPArr, 0, sizeof(entryPArr));

  // get entry batch
  int32_t   getCount = 0;
  SyncIndex getEntryIndex = nextIndex;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) {
    SSyncRaftEntry* pEntry = NULL;
    int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry);
    if (code == 0) {
      ASSERT(pEntry != NULL);
      entryPArr[i] = pEntry;
      getCount++;
      getEntryIndex++;

    } else {
      break;
    }
  }

  // event log
  do {
    char     logBuf[128];
    char     host[64];
    uint16_t port;
    syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
    snprintf(logBuf, sizeof(logBuf), "build batch:%d for %s:%d", getCount, host, port);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

  // build msg
  SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId);
  ASSERT(pMsg != NULL);

  // free entries
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) {
    SSyncRaftEntry* pEntry = entryPArr[i];
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
      entryPArr[i] = NULL;
    }
  }

  // prepare msg
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = *pDestId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->prevLogIndex = preLogIndex;
  pMsg->prevLogTerm = preLogTerm;
  pMsg->commitIndex = pSyncNode->commitIndex;
  pMsg->privateTerm = 0;
  pMsg->dataCount = getCount;

  // send msg
  syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg);

  // speed up
  if (pMsg->dataCount > 0 && pSyncNode->commitIndex - pMsg->prevLogIndex > SYNC_SLOW_DOWN_RANGE) {
    ret = 1;

#if 0
      do {
        char     logBuf[128];
        char     host[64];
        uint16_t port;
        syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
        snprintf(logBuf, sizeof(logBuf), "maybe speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex);
        syncNodeEventLog(pSyncNode, logBuf);
      } while (0);
#endif
  }

  syncAppendEntriesBatchDestroy(pMsg);

  return ret;
}

int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return -1;
  }

  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SRaftId* pDestId = &(pSyncNode->peersId[i]);

    // next index
    SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
    ret = syncNodeAppendEntriesOnePeer(pSyncNode, pDestId, nextIndex);
  }

  return ret;
}

#if 0
233 234 235 236 237 238 239 240 241 242 243 244 245
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return -1;
  }

  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SRaftId* pDestId = &(pSyncNode->peersId[i]);

    // next index
    SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);

    // pre index, pre term
246 247
    SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
    SyncTerm  preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
248
    if (preLogTerm == SYNC_TERM_INVALID) {
249 250 251
      SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
      // SyncIndex newNextIndex = nextIndex + 1;

252 253
      syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
      syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
254
      sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
255
             ", match-index:%d, raftid:%" PRId64,
256 257 258 259
             pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
      return -1;
    }

M
Minghao Li 已提交
260
    // entry pointer array
261 262
    SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE];
    memset(entryPArr, 0, sizeof(entryPArr));
263

M
Minghao Li 已提交
264
    // get entry batch
265 266
    int32_t   getCount = 0;
    SyncIndex getEntryIndex = nextIndex;
267
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) {
M
Minghao Li 已提交
268
      SSyncRaftEntry* pEntry = NULL;
269
      int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry);
270 271
      if (code == 0) {
        ASSERT(pEntry != NULL);
272
        entryPArr[i] = pEntry;
273
        getCount++;
M
Minghao Li 已提交
274 275
        getEntryIndex++;

276 277 278 279 280
      } else {
        break;
      }
    }

281 282
    // event log
    do {
M
Minghao Li 已提交
283 284 285 286 287
      char     logBuf[128];
      char     host[64];
      uint16_t port;
      syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
      snprintf(logBuf, sizeof(logBuf), "build batch:%d for %s:%d", getCount, host, port);
288 289 290
      syncNodeEventLog(pSyncNode, logBuf);
    } while (0);

M
Minghao Li 已提交
291
    // build msg
292
    SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId);
293 294
    ASSERT(pMsg != NULL);

M
Minghao Li 已提交
295
    // free entries
296
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) {
297 298 299 300 301 302 303
      SSyncRaftEntry* pEntry = entryPArr[i];
      if (pEntry != NULL) {
        syncEntryDestory(pEntry);
        entryPArr[i] = NULL;
      }
    }

304 305 306 307 308 309 310 311 312 313 314 315
    // prepare msg
    pMsg->srcId = pSyncNode->myRaftId;
    pMsg->destId = *pDestId;
    pMsg->term = pSyncNode->pRaftStore->currentTerm;
    pMsg->prevLogIndex = preLogIndex;
    pMsg->prevLogTerm = preLogTerm;
    pMsg->commitIndex = pSyncNode->commitIndex;
    pMsg->privateTerm = 0;
    pMsg->dataCount = getCount;

    // send msg
    syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg);
M
Minghao Li 已提交
316 317

    // speed up
M
Minghao Li 已提交
318
    if (pMsg->dataCount > 0 && pSyncNode->commitIndex - pMsg->prevLogIndex > SYNC_SLOW_DOWN_RANGE) {
M
Minghao Li 已提交
319 320
      ret = 1;

M
Minghao Li 已提交
321
#if 0
M
Minghao Li 已提交
322 323 324 325 326
      do {
        char     logBuf[128];
        char     host[64];
        uint16_t port;
        syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
M
Minghao Li 已提交
327
        snprintf(logBuf, sizeof(logBuf), "maybe speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex);
M
Minghao Li 已提交
328 329
        syncNodeEventLog(pSyncNode, logBuf);
      } while (0);
M
Minghao Li 已提交
330
#endif
M
Minghao Li 已提交
331
    }
332 333

    syncAppendEntriesBatchDestroy(pMsg);
334 335
  }

M
Minghao Li 已提交
336
  return ret;
337
}
338
#endif
339

340
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
341
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
342

343 344 345
  syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex);
  syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex);
  logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
346 347 348 349 350

  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SRaftId* pDestId = &(pSyncNode->peersId[i]);

351
    // next index
352
    SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
353 354 355 356

    // pre index, pre term
    SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
    SyncTerm  preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
357
    if (preLogTerm == SYNC_TERM_INVALID) {
358 359 360
      SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
      // SyncIndex newNextIndex = nextIndex + 1;

361 362
      syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
      syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
363
      sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
364
             ", match-index:%d, raftid:%" PRId64,
365 366 367 368
             pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);

      return -1;
    }
369

370 371
    // prepare entry
    SyncAppendEntries* pMsg = NULL;
372

373 374
    SSyncRaftEntry* pEntry;
    int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
M
Minghao Li 已提交
375

376 377 378
    if (code == 0) {
      ASSERT(pEntry != NULL);

379 380
      pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId);
      ASSERT(pMsg != NULL);
381

382 383 384
      // add pEntry into msg
      uint32_t len;
      char*    serialized = syncEntrySerialize(pEntry, &len);
M
Minghao Li 已提交
385
      ASSERT(len == pEntry->bytes);
386
      memcpy(pMsg->data, serialized, len);
M
Minghao Li 已提交
387

388 389
      taosMemoryFree(serialized);
      syncEntryDestory(pEntry);
390 391

    } else {
392 393 394 395 396 397 398 399 400
      if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
        // no entry in log
        pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
        ASSERT(pMsg != NULL);

      } else {
        syncNodeLog3("", pSyncNode);
        ASSERT(0);
      }
401
    }
402 403 404 405 406 407 408 409 410

    // prepare msg
    ASSERT(pMsg != NULL);
    pMsg->srcId = pSyncNode->myRaftId;
    pMsg->destId = *pDestId;
    pMsg->term = pSyncNode->pRaftStore->currentTerm;
    pMsg->prevLogIndex = preLogIndex;
    pMsg->prevLogTerm = preLogTerm;
    pMsg->commitIndex = pSyncNode->commitIndex;
M
Minghao Li 已提交
411 412
    pMsg->privateTerm = 0;
    // pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId);
413 414 415 416

    // send msg
    syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
    syncAppendEntriesDestroy(pMsg);
417 418 419 420
  }

  return ret;
}
M
Minghao Li 已提交
421

M
Minghao Li 已提交
422
int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer) {
M
Minghao Li 已提交
423
  // start replicate
M
Minghao Li 已提交
424 425
  int32_t ret = 0;

M
Minghao Li 已提交
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
  switch (pSyncNode->pRaftCfg->snapshotStrategy) {
    case SYNC_STRATEGY_NO_SNAPSHOT:
      ret = syncNodeAppendEntriesPeers(pSyncNode);
      break;

    case SYNC_STRATEGY_STANDARD_SNAPSHOT:
      ret = syncNodeAppendEntriesPeersSnapshot(pSyncNode);
      break;

    case SYNC_STRATEGY_WAL_FIRST:
      ret = syncNodeAppendEntriesPeersSnapshot2(pSyncNode);
      break;

    default:
      ret = syncNodeAppendEntriesPeers(pSyncNode);
      break;
M
Minghao Li 已提交
442
  }
M
Minghao Li 已提交
443

M
Minghao Li 已提交
444 445 446 447 448 449 450 451 452
  // start delay
  int64_t timeNow = taosGetTimestampMs();
  int64_t startDelay = timeNow - pSyncNode->startTime;

  // replicate delay
  int64_t replicateDelay = timeNow - pSyncNode->lastReplicateTime;
  pSyncNode->lastReplicateTime = timeNow;

  if (ret > 0 && isTimer && startDelay > SYNC_SPEED_UP_AFTER_MS) {
M
Minghao Li 已提交
453
    // speed up replicate
M
Minghao Li 已提交
454 455
    int32_t ms =
        pSyncNode->heartbeatTimerMS < SYNC_SPEED_UP_HB_TIMER ? pSyncNode->heartbeatTimerMS : SYNC_SPEED_UP_HB_TIMER;
M
Minghao Li 已提交
456 457
    syncNodeRestartNowHeartbeatTimerMS(pSyncNode, ms);

M
Minghao Li 已提交
458 459 460 461 462 463 464 465
#if 0
    do {
      char logBuf[128];
      snprintf(logBuf, sizeof(logBuf), "replicate speed up");
      syncNodeEventLog(pSyncNode, logBuf);
    } while (0);
#endif

M
Minghao Li 已提交
466 467
  } else {
    syncNodeRestartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
468 469 470 471 472 473 474 475

#if 0
    do {
      char logBuf[128];
      snprintf(logBuf, sizeof(logBuf), "replicate slow down");
      syncNodeEventLog(pSyncNode, logBuf);
    } while (0);
#endif
M
Minghao Li 已提交
476
  }
477

M
Minghao Li 已提交
478 479
  return ret;
}
M
Minghao Li 已提交
480

M
Minghao Li 已提交
481 482 483 484 485 486 487 488 489
int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) {
  // next index
  SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);

  // maybe start snapshot
  SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
  if (nextIndex < logStartIndex || nextIndex > logEndIndex) {
    // start snapshot
490 491
    int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId);
    ASSERT(code == 0);
M
Minghao Li 已提交
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
    return 0;
  }

  // pre index, pre term
  SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
  SyncTerm  preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);

  // prepare entry
  SyncAppendEntries* pMsg = NULL;

  SSyncRaftEntry* pEntry;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);

  if (code == 0) {
    ASSERT(pEntry != NULL);

    pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId);
    ASSERT(pMsg != NULL);

    // add pEntry into msg
    uint32_t len;
    char*    serialized = syncEntrySerialize(pEntry, &len);
    ASSERT(len == pEntry->bytes);
    memcpy(pMsg->data, serialized, len);

    taosMemoryFree(serialized);
    syncEntryDestory(pEntry);

  } else {
    if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
      // no entry in log
      pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
      ASSERT(pMsg != NULL);

    } else {
      syncNodeLog3("", pSyncNode);
      ASSERT(0);
    }
  }

  // prepare msg
  ASSERT(pMsg != NULL);
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = *pDestId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->prevLogIndex = preLogIndex;
  pMsg->prevLogTerm = preLogTerm;
  pMsg->commitIndex = pSyncNode->commitIndex;
  pMsg->privateTerm = 0;
  // pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId);

  // send msg
  syncNodeMaybeSendAppendEntries(pSyncNode, pDestId, pMsg);
  syncAppendEntriesDestroy(pMsg);

  return 0;
}

int32_t syncNodeDoReplicate(SSyncNode* pSyncNode) {
551 552
  syncNodeEventLog(pSyncNode, "do replicate");

M
Minghao Li 已提交
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return -1;
  }

  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SRaftId* pDestId = &(pSyncNode->peersId[i]);
    ret = syncNodeDoAppendEntries(pSyncNode, pDestId);
    if (ret != 0) {
      char    host[64];
      int16_t port;
      syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
      sError("vgId:%d, do append entries error for %s:%d", pSyncNode->vgId, host, port);
    }
  }

  return 0;
}

int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
  int32_t ret = 0;
  syncLogSendAppendEntries(pSyncNode, pMsg, "");

  SRpcMsg rpcMsg;
  syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);

  SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId);
  ASSERT(pState != NULL);

  pState->lastSendIndex = pMsg->prevLogIndex + 1;
  pState->lastSendTime = taosGetTimestampMs();

  return ret;
}

int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
  int32_t ret = 0;
  if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
    ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pMsg);
  }
  return ret;
}

M
Minghao Li 已提交
597 598
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
599
  syncLogSendAppendEntries(pSyncNode, pMsg, "");
600

M
Minghao Li 已提交
601 602 603 604
  SRpcMsg rpcMsg;
  syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
605 606 607 608
}

int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId,
                                   const SyncAppendEntriesBatch* pMsg) {
M
Minghao Li 已提交
609
  syncLogSendAppendEntriesBatch(pSyncNode, pMsg, "");
610 611 612 613 614

  SRpcMsg rpcMsg;
  syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return 0;
615 616 617 618 619 620 621 622 623 624
}

int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) {
  int32_t ret = 0;
  syncLogSendHeartbeat(pSyncNode, pMsg, "");

  SRpcMsg rpcMsg;
  syncHeartbeat2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(&(pMsg->destId), pSyncNode, &rpcMsg);
  return ret;
M
Minghao Li 已提交
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
}

int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
    pSyncMsg->srcId = pSyncNode->myRaftId;
    pSyncMsg->destId = pSyncNode->peersId[i];
    pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
    pSyncMsg->commitIndex = pSyncNode->commitIndex;
    pSyncMsg->privateTerm = 0;

    SRpcMsg rpcMsg;
    syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);

    // send msg
    syncNodeHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);

    syncHeartbeatDestroy(pSyncMsg);
  }

  return 0;
M
Minghao Li 已提交
646
}