syncMain.c 87.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
M
Minghao Li 已提交
40

M
Minghao Li 已提交
41 42 43 44
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
45
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
46
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
47 48 49
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
50 51 52 53 54 55 56 57 58 59 60
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
61

62
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
63
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
64
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
65
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
66 67
    return -1;
  }
M
Minghao Li 已提交
68

S
Shengliang Guan 已提交
69
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
70
  if (pSyncNode->rid < 0) {
71
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
72 73 74
    return -1;
  }

S
Shengliang Guan 已提交
75 76 77 78 79 80
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
81
  return pSyncNode->rid;
M
Minghao Li 已提交
82
}
M
Minghao Li 已提交
83

B
Benguang Zhao 已提交
84
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
85
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
86
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
87
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
B
Benguang Zhao 已提交
88 89 90 91
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
92
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
93
    goto _err;
M
Minghao Li 已提交
94
  }
M
Minghao Li 已提交
95

B
Benguang Zhao 已提交
96 97 98 99
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
100

B
Benguang Zhao 已提交
101 102
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
103

104 105 106
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
107 108
}

M
Minghao Li 已提交
109
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
110
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
111
  if (pSyncNode != NULL) {
112
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
113
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
114
    syncNodeRemove(rid);
M
Minghao Li 已提交
115 116 117
  }
}

M
Minghao Li 已提交
118 119
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
120 121 122
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
123 124 125
  }
}

126 127 128 129 130 131 132 133
void syncPostStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodePostClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
}

S
Shengliang Guan 已提交
134 135 136
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
137 138
}

S
Shengliang Guan 已提交
139
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
140
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
141
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
142

143
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
C
cadem 已提交
144 145
    syncNodeRelease(pSyncNode);
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
146
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
C
cadem 已提交
147 148 149
    return 0;
  }

M
Minghao Li 已提交
150
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
151
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
152
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
153
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
154
    return -1;
M
Minghao Li 已提交
155
  }
156

S
Shengliang Guan 已提交
157
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
C
cadem 已提交
158
  syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex);
S
Shengliang Guan 已提交
159

M
Minghao Li 已提交
160 161 162
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

C
cadem 已提交
163
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
S
Shengliang Guan 已提交
164
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
165 166 167
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
168
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
169
  }
S
Shengliang Guan 已提交
170

S
Shengliang Guan 已提交
171
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
172
  return 0;
M
Minghao Li 已提交
173
}
M
Minghao Li 已提交
174

S
Shengliang Guan 已提交
175 176 177 178
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
179
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
180 181
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
182 183 184 185 186 187 188 189 190 191
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
192 193 194
    case TDMT_SYNC_TIMEOUT_ELECTION:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
214
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
215 216 217 218
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
C
cadem 已提交
219 220 221
    case TDMT_SYNC_FORCE_FOLLOWER:
      code = syncForceBecomeFollower(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
222
    default:
223
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
224
      code = -1;
M
Minghao Li 已提交
225 226
  }

S
Shengliang Guan 已提交
227
  syncNodeRelease(pSyncNode);
228 229 230 231
  if (code != 0) {
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
           terrno);
  }
S
Shengliang Guan 已提交
232
  return code;
233 234
}

S
Shengliang Guan 已提交
235
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
236
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
237
  if (pSyncNode == NULL) return -1;
238

S
Shengliang Guan 已提交
239
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
240
  syncNodeRelease(pSyncNode);
241 242 243
  return ret;
}

C
cadem 已提交
244 245
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  syncNodeBecomeFollower(ths, "force election");
C
cadem 已提交
246

C
cadem 已提交
247 248 249 250 251 252 253 254 255
  SRpcMsg rsp = {
      .code = 0,
      .pCont = pRpcMsg->info.rsp,
      .contLen = pRpcMsg->info.rspLen,
      .info = pRpcMsg->info,
  };
  tmsgSendRsp(&rsp);

  return 0;
C
cadem 已提交
256 257
}

258
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
259
  SSyncNode* pNode = syncNodeAcquire(rid);
260
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
261 262

  SRpcMsg rpcMsg = {0};
263
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
264 265 266
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
267
  if (ret == 1) {
268
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
269
    rpcSendResponse(&rpcMsg);
270 271
    return 0;
  } else {
272
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
273
    return -1;
274
  }
S
Shengliang Guan 已提交
275 276
}

M
Minghao Li 已提交
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

293
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
294
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
295
  if (pSyncNode == NULL) {
296
    sError("sync begin snapshot error");
297 298
    return -1;
  }
299

300 301 302 303 304 305 306 307 308 309
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
    syncNodeRelease(pSyncNode);
    return 0;
  }

310
  int32_t code = 0;
311
  int64_t logRetention = 0;
312

M
Minghao Li 已提交
313
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
314
    // mnode
315
    logRetention = tsMndLogRetention;
M
Minghao Li 已提交
316 317 318 319
  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas
320 321 322
      logRetention = SYNC_VNODE_LOG_RETENTION;
    }
  }
M
Minghao Li 已提交
323

C
cadem 已提交
324
  if (pSyncNode->totalReplicaNum > 1) {
325 326
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
327 328 329 330
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
              lastApplyIndex);
      syncNodeRelease(pSyncNode);
      return 0;
331
    }
332
    logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
333 334
  }

M
Minghao Li 已提交
335
_DEL_WAL:
336

M
Minghao Li 已提交
337
  do {
338 339 340 341 342 343 344 345 346 347 348
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

349
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
350 351 352 353 354 355 356 357
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
358

M
Minghao Li 已提交
359
      } else {
360 361
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
362
      }
363
    }
M
Minghao Li 已提交
364
  } while (0);
365

S
Shengliang Guan 已提交
366
  syncNodeRelease(pSyncNode);
367 368 369 370
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
371
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
372
  if (pSyncNode == NULL) {
373
    sError("sync end snapshot error");
374 375 376
    return -1;
  }

377 378 379 380
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
381
    if (code != 0) {
382
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
383
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
384 385
      return -1;
    } else {
S
Shengliang Guan 已提交
386
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
387 388
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
389
  }
390

S
Shengliang Guan 已提交
391
  syncNodeRelease(pSyncNode);
392 393 394
  return code;
}

M
Minghao Li 已提交
395
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
396
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
397
  if (pSyncNode == NULL) {
398
    sError("sync step down error");
M
Minghao Li 已提交
399 400 401
    return -1;
  }

M
Minghao Li 已提交
402
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
403
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
404
  return 0;
M
Minghao Li 已提交
405 406
}

407
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
408
  if (pSyncNode == NULL) {
409
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
410
    sError("sync ready for read error");
411 412
    return false;
  }
M
Minghao Li 已提交
413

414 415 416 417 418
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

419
  if (!pSyncNode->restoreFinish) {
420
    terrno = TSDB_CODE_SYN_RESTORING;
421
    return false;
422
  }
423

424
  return true;
425 426 427 428 429 430 431 432 433 434 435
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

436 437
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
438
}
M
Minghao Li 已提交
439

440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
462 463
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
464
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
465
    return 0;
M
Minghao Li 已提交
466
  }
M
Minghao Li 已提交
467

468
  int32_t ret = 0;
469
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
470
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
471 472 473 474 475 476 477
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
478 479 480
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
481
  return ret;
M
Minghao Li 已提交
482 483
}

M
Minghao Li 已提交
484 485
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
486
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
487
    return -1;
M
Minghao Li 已提交
488
  }
489

S
Shengliang Guan 已提交
490
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
491

492 493 494 495
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
496
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
M
Minghao Li 已提交
497 498 499
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
500
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
501 502
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
503 504
}

505 506
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
507

S
Shengliang Guan 已提交
508
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
509 510 511
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
512 513 514 515 516
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
517
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
518 519
  }

520
  return state;
M
Minghao Li 已提交
521 522
}

523
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
524 525
  ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
526

527 528 529 530
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
531 532
    }
  }
S
Shengliang Guan 已提交
533
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
534
         snapshotLastApplyIndex, lastIndex);
535 536 537 538

  return lastIndex;
}

539 540
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
541

S
Shengliang Guan 已提交
542
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
543
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
544

C
cadem 已提交
545
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
546
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
S
Shengliang Guan 已提交
547
    SEp* pEp = &pEpSet->eps[i];
548 549
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
S
Shengliang Guan 已提交
550
    pEpSet->numOfEps++;
551
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
552
  }
M
Minghao Li 已提交
553
  if (pEpSet->numOfEps > 0) {
554
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
555 556
  }

S
Shengliang Guan 已提交
557
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
558
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
559 560
}

S
Shengliang Guan 已提交
561
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
562
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
563
  if (pSyncNode == NULL) {
564
    sError("sync propose error");
M
Minghao Li 已提交
565
    return -1;
566
  }
567

S
Shengliang Guan 已提交
568
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
569
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
570 571
  return ret;
}
M
Minghao Li 已提交
572

C
cadem 已提交
573 574 575 576 577 578 579
int32_t syncIsCatchUp(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync Node Acquire error since %d", errno);
    return -1;
  }

C
cadem 已提交
580
  int32_t isCatchUp = 0;
581
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
C
cadem 已提交
582
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
583 584 585 586
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
          pSyncNode->pLogBuf->matchIndex);
C
cadem 已提交
587
    isCatchUp = 0;
588 589 590
  } else {
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
C
cadem 已提交
591
    isCatchUp = 1;
C
cadem 已提交
592
  }
593

C
cadem 已提交
594
  syncNodeRelease(pSyncNode);
C
cadem 已提交
595 596 597 598 599 600 601 602 603 604 605
  return isCatchUp;
}

ESyncRole syncGetRole(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync Node Acquire error since %d", errno);
    return -1;
  }

  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
606

C
cadem 已提交
607 608
  syncNodeRelease(pSyncNode);
  return role;
C
cadem 已提交
609 610
}

S
Shengliang Guan 已提交
611
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
612 613
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
614
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
615 616
    return -1;
  }
617

S
Shengliang Guan 已提交
618 619 620 621 622 623 624
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
625

626
  // heartbeat timeout
627
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
628 629 630 631 632 633
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
634 635 636
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
637
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
638 639
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
640
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
641 642 643
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
644
    } else {
S
Shengliang Guan 已提交
645
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
646
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
647
             TMSG_INFO(pMsg->msgType));
648
      return -1;
649
    }
S
Shengliang Guan 已提交
650
  } else {
S
Shengliang Guan 已提交
651 652
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
653
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
654
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
655 656 657 658
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
659
    }
660

661 662 663
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
664
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
665
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
666
    }
M
Minghao Li 已提交
667

S
Shengliang Guan 已提交
668
    if (seq != NULL) *seq = seqNum;
669
    return code;
M
Minghao Li 已提交
670
  }
M
Minghao Li 已提交
671 672
}

S
Shengliang Guan 已提交
673
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
674 675 676 677 678
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
679
  pSyncTimer->timeStamp = taosGetTimestampMs();
680 681 682 683
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
684
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
685
  int32_t ret = 0;
S
Shengliang Guan 已提交
686
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
687
  if (syncIsInit()) {
688 689 690 691 692 693
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
694
    pSyncTimer->timeStamp = tsNow;
695 696

    pData->syncNodeRid = pSyncNode->rid;
697 698 699
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
700
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
701

C
cadem 已提交
702 703
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr);

704 705
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
706 707 708 709 710 711
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
712
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
713 714 715 716
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
717 718
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
719 720 721
  return ret;
}

722
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
723 724 725
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
726 727 728
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

729 730 731 732 733
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
S
Shengliang Guan 已提交
734
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
735 736 737 738 739 740 741
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
742
// open/close --------------
S
Shengliang Guan 已提交
743 744
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
745 746 747 748
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
749

M
Minghao Li 已提交
750 751 752 753
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
754
      goto _error;
M
Minghao Li 已提交
755
    }
756
  }
M
Minghao Li 已提交
757

758 759 760
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
S
Shengliang Guan 已提交
761
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
762

763
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
764
    // create a new raft config file
765
    sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
766 767
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
C
cadem 已提交
768
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
769 770 771 772 773 774 775
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
    pSyncNode->raftCfg.configIndexCount = 1;
    pSyncNode->raftCfg.configIndexArr[0] = -1;

    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
776
      goto _error;
777
    }
778 779
  } else {
    // update syncCfg by raft_config.json
780 781
    if (syncReadCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
782
      goto _error;
783
    }
S
Shengliang Guan 已提交
784

C
cadem 已提交
785
    if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
786
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
787 788 789
      pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
      if (syncWriteCfgFile(pSyncNode) != 0) {
        sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
S
Shengliang Guan 已提交
790 791
        goto _error;
      }
S
Shengliang Guan 已提交
792
    } else {
793 794
      sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
795
    }
M
Minghao Li 已提交
796 797
  }

M
Minghao Li 已提交
798
  // init by SSyncInfo
M
Minghao Li 已提交
799
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
800
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
801
  bool      updated = false;
802 803
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
C
cadem 已提交
804
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
S
Shengliang Guan 已提交
805
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
806 807 808
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
      updated = true;
    }
809 810
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
811 812
  }

813 814 815 816 817 818 819 820
  if (updated) {
    sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
      goto _error;
    }
  }

M
Minghao Li 已提交
821
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
822
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
823 824 825
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
826

B
Benguang Zhao 已提交
827 828 829
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
830
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
831 832 833
    goto _error;
  }

M
Minghao Li 已提交
834
  // init internal
835
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
836
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
837
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
838
    goto _error;
839
  }
M
Minghao Li 已提交
840

M
Minghao Li 已提交
841
  // init peersNum, peers, peersId
C
cadem 已提交
842
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
S
Shengliang Guan 已提交
843
  int32_t j = 0;
C
cadem 已提交
844
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
845 846 847
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
848 849 850
      j++;
    }
  }
S
Shengliang Guan 已提交
851
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
852
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
853
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
854
      goto _error;
855
    }
M
Minghao Li 已提交
856
  }
M
Minghao Li 已提交
857

M
Minghao Li 已提交
858
  // init replicaNum, replicasId
859
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
C
cadem 已提交
860 861
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
862
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
863
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
864
      goto _error;
865
    }
M
Minghao Li 已提交
866 867
  }

M
Minghao Li 已提交
868
  // init raft algorithm
M
Minghao Li 已提交
869
  pSyncNode->pFsm = pSyncInfo->pFsm;
870
  pSyncInfo->pFsm = NULL;
871
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
M
Minghao Li 已提交
872 873
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
874
  // init life cycle outside
M
Minghao Li 已提交
875

M
Minghao Li 已提交
876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
900
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
901
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
902
  if (raftStoreOpen(pSyncNode) != 0) {
S
Shengliang Guan 已提交
903
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
904 905
    goto _error;
  }
M
Minghao Li 已提交
906

M
Minghao Li 已提交
907
  // init TLA+ candidate vars
M
Minghao Li 已提交
908
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
909
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
910
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
911 912
    goto _error;
  }
M
Minghao Li 已提交
913
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
914
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
915
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
916 917
    goto _error;
  }
M
Minghao Li 已提交
918

M
Minghao Li 已提交
919 920
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
921
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
922
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
923 924
    goto _error;
  }
M
Minghao Li 已提交
925
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
926
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
927
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
928 929
    goto _error;
  }
M
Minghao Li 已提交
930 931 932

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
933
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
934
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
935 936
    goto _error;
  }
937 938 939 940

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
941
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
942 943
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
944
      sNTrace(pSyncNode, "reset commit index by snapshot");
945 946 947
    }
  }
  pSyncNode->commitIndex = commitIndex;
948
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
M
Minghao Li 已提交
949

950
  // restore log store on need
951
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
952
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
953 954
    goto _error;
  }
955

M
Minghao Li 已提交
956 957
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
958 959
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
960

M
Minghao Li 已提交
961
  // init ping timer
M
Minghao Li 已提交
962
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
963
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
964 965
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
966
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
967
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
968

M
Minghao Li 已提交
969 970
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
971
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
972
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
973
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
974 975 976 977
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
978
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
979 980
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
981
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
982 983
  pSyncNode->heartbeatTimerCounter = 0;

984
  // init peer heartbeat timer
C
cadem 已提交
985
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
986 987 988
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
989
  // tools
M
Minghao Li 已提交
990
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
991
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
992
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
993 994
    goto _error;
  }
M
Minghao Li 已提交
995

996 997
  // restore state
  pSyncNode->restoreFinish = false;
998

M
Minghao Li 已提交
999
  // snapshot senders
C
cadem 已提交
1000
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1001
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
1002 1003 1004 1005
    if (pSender == NULL) return NULL;

    pSyncNode->senders[i] = pSender;
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
M
Minghao Li 已提交
1006 1007 1008
  }

  // snapshot receivers
1009
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
1010 1011 1012
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
          pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1013

M
Minghao Li 已提交
1014 1015 1016
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
1017
  // replication mgr
1018
  if (syncNodeLogReplInit(pSyncNode) < 0) {
1019 1020 1021
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
B
Benguang Zhao 已提交
1022

M
Minghao Li 已提交
1023
  // peer state
1024 1025 1026 1027
  if (syncNodePeerStateInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
M
Minghao Li 已提交
1028

B
Benguang Zhao 已提交
1029
  //
M
Minghao Li 已提交
1030 1031 1032
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1033
  // start in syncNodeStart
M
Minghao Li 已提交
1034
  // start raft
M
Minghao Li 已提交
1035
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1036

M
Minghao Li 已提交
1037 1038
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1039
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1040 1041
  pSyncNode->lastReplicateTime = timeNow;

1042 1043 1044
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
1045 1046
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
1047
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1048
    goto _error;
B
Benguang Zhao 已提交
1049 1050
  }

1051
  pSyncNode->isStart = true;
1052 1053 1054
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1055 1056
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1057
  pSyncNode->tmrRoutineNum = 0;
1058

1059 1060
  sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
M
Minghao Li 已提交
1061
  return pSyncNode;
1062 1063 1064

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1065 1066
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1067 1068 1069 1070
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1071 1072
}

M
Minghao Li 已提交
1073 1074
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1075 1076
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1077 1078 1079 1080 1081 1082
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1083
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1084 1085
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1086 1087 1088 1089

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1090 1091
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
S
Shengliang Guan 已提交
1092
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
1093 1094 1095
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1096

1097
  ASSERT(endIndex == lastVer + 1);
1098 1099
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
B
Benguang Zhao 已提交
1100

1101
  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
B
Benguang Zhao 已提交
1102 1103 1104 1105 1106 1107 1108 1109
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
1110
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
C
cadem 已提交
1111
    syncNodeBecomeLearner(pSyncNode, "first start");
1112
  } else {
C
cadem 已提交
1113 1114 1115
    if (pSyncNode->replicaNum == 1) {
      raftStoreNextTerm(pSyncNode);
      syncNodeBecomeLeader(pSyncNode, "one replica start");
B
Benguang Zhao 已提交
1116

C
cadem 已提交
1117 1118 1119 1120
      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
    } else {
      syncNodeBecomeFollower(pSyncNode, "first start");
1121
    }
B
Benguang Zhao 已提交
1122 1123 1124 1125
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1126 1127
  if (ret != 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1128
  }
1129
  return ret;
M
Minghao Li 已提交
1130 1131
}

B
Benguang Zhao 已提交
1132
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1133 1134 1135 1136 1137 1138 1139
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1140 1141 1142 1143
  if (ret < 0) {
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
1144

1145
  ret = syncNodeStartPingTimer(pSyncNode);
1146 1147 1148 1149
  if (ret < 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
B
Benguang Zhao 已提交
1150
  return ret;
M
Minghao Li 已提交
1151 1152
}

M
Minghao Li 已提交
1153
void syncNodePreClose(SSyncNode* pSyncNode) {
1154 1155 1156 1157
  ASSERT(pSyncNode != NULL);
  ASSERT(pSyncNode->pFsm != NULL);
  ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);

M
Minghao Li 已提交
1158 1159 1160 1161 1162
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
1163

1164 1165 1166
  // stop ping timer
  syncNodeStopPingTimer(pSyncNode);

1167 1168
  // clean rsp
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
1169 1170
}

1171 1172 1173
void syncNodePostClose(SSyncNode* pSyncNode) {
  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1174
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1175 1176 1177 1178 1179 1180 1181
    }

    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }
M
Minghao Li 已提交
1182 1183
}

1184
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1185

M
Minghao Li 已提交
1186
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1187
  if (pSyncNode == NULL) return;
1188
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1189

1190 1191
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);

1192 1193 1194
  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);
1195
  syncNodeLogReplDestroy(pSyncNode);
1196

M
Minghao Li 已提交
1197
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1198
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1199
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1200
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1201
  votesRespondDestory(pSyncNode->pVotesRespond);
1202
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1203
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1204
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1205
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1206
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1207
  logStoreDestory(pSyncNode->pLogStore);
1208
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1209 1210
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1211

C
cadem 已提交
1212
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1213 1214
    if (pSyncNode->senders[i] != NULL) {
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
1215

1216 1217
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
        snapshotSenderStop(pSyncNode->senders[i], false);
1218 1219
      }

1220 1221
      snapshotSenderDestroy(pSyncNode->senders[i]);
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1222 1223 1224
    }
  }

M
Minghao Li 已提交
1225
  if (pSyncNode->pNewNodeReceiver != NULL) {
1226
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1227
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1228 1229
    }

1230
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1231 1232 1233 1234
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1235 1236 1237 1238
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

1239 1240
  raftStoreClose(pSyncNode);

1241
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1242 1243
}

1244
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
M
Minghao Li 已提交
1245

M
Minghao Li 已提交
1246 1247 1248
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1249 1250
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1251 1252 1253
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1254
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1255
  }
M
Minghao Li 已提交
1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1269
  if (syncIsInit()) {
1270
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1271

1272 1273 1274 1275 1276
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1277

M
Minghao Li 已提交
1278
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1279
                 &pSyncNode->pElectTimer);
1280

1281
  } else {
M
Minghao Li 已提交
1282
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1283
  }
M
Minghao Li 已提交
1284 1285 1286 1287 1288
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1289
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1290 1291
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1292

M
Minghao Li 已提交
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

1303
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1304 1305
  int32_t electMS;

1306
  if (pSyncNode->raftCfg.isStandBy) {
M
Minghao Li 已提交
1307 1308 1309 1310
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
1311 1312

  (void)syncNodeRestartElectTimer(pSyncNode, electMS);
1313

S
Shengliang Guan 已提交
1314 1315
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1316 1317
}

M
Minghao Li 已提交
1318
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1319
  int32_t ret = 0;
S
Shengliang Guan 已提交
1320 1321
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1322 1323 1324
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1325
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1326
  }
1327

S
Shengliang Guan 已提交
1328
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1329 1330 1331
  return ret;
}

M
Minghao Li 已提交
1332
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1333
  int32_t ret = 0;
M
Minghao Li 已提交
1334

1335
#if 0
M
Minghao Li 已提交
1336
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1337 1338
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1339

S
Shengliang Guan 已提交
1340
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1341
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1342 1343 1344
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1345
  }
1346

M
Minghao Li 已提交
1347 1348 1349
  return ret;
}

M
Minghao Li 已提交
1350 1351
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1352 1353

#if 0
M
Minghao Li 已提交
1354 1355 1356
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1357
#endif
1358

S
Shengliang Guan 已提交
1359
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1360
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1361 1362 1363
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1364
  }
1365

M
Minghao Li 已提交
1366 1367 1368
  return ret;
}

1369 1370 1371 1372 1373 1374
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

1375 1376 1377 1378 1379 1380 1381 1382
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
  SEpSet* epSet = NULL;
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
    if (destRaftId->addr == pNode->peersId[i].addr) {
      epSet = &pNode->peersEpset[i];
      break;
    }
  }
1383

S
Shengliang Guan 已提交
1384
  int32_t code = -1;
1385
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
M
Minghao Li 已提交
1386
    syncUtilMsgHtoN(pMsg->pCont);
1387
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1388 1389 1390 1391 1392 1393
    code = pNode->syncSendMSg(epSet, pMsg);
  }

  if (code < 0) {
    sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet,
           DID(destRaftId), destRaftId->addr, terrno);
S
Shengliang Guan 已提交
1394
    rpcFreeCont(pMsg->pCont);
1395
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
1396
  }
S
Shengliang Guan 已提交
1397 1398

  return code;
M
Minghao Li 已提交
1399 1400
}

1401
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
1402 1403 1404
  bool b1 = false;
  bool b2 = false;

C
cadem 已提交
1405
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
1406 1407
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
1408 1409 1410 1411 1412
      b1 = true;
      break;
    }
  }

C
cadem 已提交
1413
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
1414 1415 1416 1417
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
        .vgId = pNode->vgId,
    };
1418

1419
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
1420 1421 1422 1423 1424
      b2 = true;
      break;
    }
  }

1425
  ASSERT(b1 == b2);
1426 1427 1428
  return b1;
}

1429
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
C
cadem 已提交
1430
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
1431
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
C
cadem 已提交
1432
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
1433 1434 1435 1436
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
1437
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
1438 1439 1440 1441 1442
  }

  return false;
}

M
Minghao Li 已提交
1443
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1444
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1445 1446 1447 1448
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1449

1450 1451
  pSyncNode->raftCfg.cfg = *pNewConfig;
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
1452

1453 1454
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1455 1456
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1457

M
Minghao Li 已提交
1458 1459
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1460

M
Minghao Li 已提交
1461 1462 1463 1464
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1465
  }
1466

M
Minghao Li 已提交
1467 1468 1469 1470 1471
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1472

M
Minghao Li 已提交
1473
  // log begin config change
1474 1475 1476
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
         pNewConfig->lastIndex);
M
Minghao Li 已提交
1477

M
Minghao Li 已提交
1478
  if (IamInNew) {
1479
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1480
  }
M
Minghao Li 已提交
1481
  if (isDrop) {
1482
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
M
Minghao Li 已提交
1483 1484
  }

M
Minghao Li 已提交
1485
  // add last config index
1486
  syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
M
Minghao Li 已提交
1487

M
Minghao Li 已提交
1488 1489 1490 1491 1492
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
C
cadem 已提交
1493
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
M
Minghao Li 已提交
1494
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
C
cadem 已提交
1495 1496
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1497
      oldSenders[i] = pSyncNode->senders[i];
S
Shengliang Guan 已提交
1498
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1499
    }
1500

M
Minghao Li 已提交
1501
    // init internal
1502
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
1503
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1504 1505

    // init peersNum, peers, peersId
C
cadem 已提交
1506
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
S
Shengliang Guan 已提交
1507
    int32_t j = 0;
C
cadem 已提交
1508
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1509 1510 1511
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
1512 1513 1514
        j++;
      }
    }
S
Shengliang Guan 已提交
1515
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1516
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1517
    }
1518

M
Minghao Li 已提交
1519
    // init replicaNum, replicasId
1520
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
C
cadem 已提交
1521 1522
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1523
      syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1524
    }
1525

1526
    // update quorum first
1527
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
1528

M
Minghao Li 已提交
1529 1530 1531 1532
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1533

M
Minghao Li 已提交
1534
    // reset snapshot senders
1535

M
Minghao Li 已提交
1536
    // clear new
C
cadem 已提交
1537
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1538
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1539
    }
M
Minghao Li 已提交
1540

M
Minghao Li 已提交
1541
    // reset new
C
cadem 已提交
1542
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
M
Minghao Li 已提交
1543 1544
      // reset sender
      bool reset = false;
C
cadem 已提交
1545
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
M
Minghao Li 已提交
1546
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
1547 1548
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
M
Minghao Li 已提交
1549

1550
          pSyncNode->senders[i] = oldSenders[j];
M
Minghao Li 已提交
1551 1552 1553 1554
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
1555 1556
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
          pSyncNode->senders[i]->replicaIndex = i;
M
Minghao Li 已提交
1557

1558 1559
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
M
Minghao Li 已提交
1560 1561

          break;
M
Minghao Li 已提交
1562
        }
1563 1564
      }
    }
1565

M
Minghao Li 已提交
1566
    // create new
C
cadem 已提交
1567
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1568 1569 1570 1571 1572 1573 1574 1575
      if (pSyncNode->senders[i] == NULL) {
        pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
        if (pSyncNode->senders[i] == NULL) {
          // will be created later while send snapshot
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
        } else {
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
        }
S
Shengliang Guan 已提交
1576
      } else {
1577
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
M
Minghao Li 已提交
1578
      }
1579 1580
    }

M
Minghao Li 已提交
1581
    // free old
C
cadem 已提交
1582
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1583
      if (oldSenders[i] != NULL) {
1584
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1585 1586 1587
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1588 1589
    }

1590
    // persist cfg
1591
    syncWriteCfgFile(pSyncNode);
1592

1593
#if 0
M
Minghao Li 已提交
1594 1595
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
1596
      syncNodeBecomeLeader(pSyncNode, "");
1597 1598 1599

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1600
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1601

M
Minghao Li 已提交
1602
    } else {
1603
      syncNodeBecomeFollower(pSyncNode, "");
M
Minghao Li 已提交
1604
    }
1605
#endif
M
Minghao Li 已提交
1606
  } else {
1607
    // persist cfg
1608
    syncWriteCfgFile(pSyncNode);
C
cadem 已提交
1609
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
1610
  }
1611

M
Minghao Li 已提交
1612
_END:
M
Minghao Li 已提交
1613
  // log end config change
C
cadem 已提交
1614
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
M
Minghao Li 已提交
1615 1616
}

M
Minghao Li 已提交
1617 1618
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
1619
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1620
    raftStoreSetTerm(pSyncNode, term);
1621
    char tmpBuf[64];
1622
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1623
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1624
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1625 1626 1627
  }
}

1628
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
1629
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1630
    raftStoreSetTerm(pSyncNode, term);
1631 1632 1633
  }
}

M
Minghao Li 已提交
1634
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
1635 1636 1637
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
  if (currentTerm > newTerm) {
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1638 1639
    return;
  }
M
Minghao Li 已提交
1640 1641

  do {
1642
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1643 1644
  } while (0);

1645
  if (currentTerm < newTerm) {
S
Shengliang Guan 已提交
1646
    raftStoreSetTerm(pSyncNode, newTerm);
M
Minghao Li 已提交
1647
    char tmpBuf[64];
1648
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1649
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1650
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1651 1652 1653 1654 1655 1656 1657 1658

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1659 1660
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1661
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1662
  // maybe clear leader cache
M
Minghao Li 已提交
1663 1664 1665 1666
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1667 1668
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1669
  // state change
M
Minghao Li 已提交
1670 1671 1672
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

1673 1674
  // trace log
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1675

1676 1677 1678
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1679 1680 1681 1682 1683
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1684 1685 1686
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1687 1688 1689
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

1690 1691
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1692 1693
}

C
cadem 已提交
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
  pSyncNode->hbSlowNum = 0;

  // state change
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;

  // trace log
  sNTrace(pSyncNode, "become learner %s", debugStr);

  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
  }

  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
}

M
Minghao Li 已提交
1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732
// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1733
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1734 1735
  pSyncNode->leaderTime = taosGetTimestampMs();

1736
  pSyncNode->becomeLeaderNum++;
1737
  pSyncNode->hbrSlowNum = 0;
1738

1739 1740 1741
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1742
  // state change
M
Minghao Li 已提交
1743
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1744 1745

  // set leader cache
M
Minghao Li 已提交
1746 1747
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1748
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
1749 1750 1751
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1752
    ASSERT(code == 0);
1753
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1754 1755
  }

S
Shengliang Guan 已提交
1756
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1757 1758
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1759 1760 1761
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1762 1763 1764
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1765
#if 0
1766 1767
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1768
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1769
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1770 1771
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
1772
      }
1773
    }
1774
    (pMySender->privateTerm) += 100;
1775
  }
M
Minghao Li 已提交
1776
#endif
1777

1778
  // close receiver
1779
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1780
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1781 1782
  }

M
Minghao Li 已提交
1783
  // stop elect timer
M
Minghao Li 已提交
1784
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1785

M
Minghao Li 已提交
1786 1787
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1788

M
Minghao Li 已提交
1789 1790
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1791

1792 1793 1794 1795 1796
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1797 1798 1799
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1800 1801 1802
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1803
  // trace log
1804
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1805 1806 1807
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1808
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1809 1810 1811 1812 1813
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
  if (!granted) {
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
    return;
  }
1814
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1815

S
Shengliang Guan 已提交
1816
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1817

B
Benguang Zhao 已提交
1818
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1819 1820 1821 1822
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1823
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1824
  ASSERT(lastIndex >= 0);
1825 1826
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1827 1828
}

M
Minghao Li 已提交
1829 1830
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1831
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
C
cadem 已提交
1832
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1833 1834 1835 1836 1837
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1838 1839 1840
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1841
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1842
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1843
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1844
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1845
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1846

S
Shengliang Guan 已提交
1847
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1848 1849 1850
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1851
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1852
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1853
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1854
  sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1855
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1856

S
Shengliang Guan 已提交
1857
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1858 1859 1860
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1861
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1862
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1863
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1864
  sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1865
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1866

S
Shengliang Guan 已提交
1867
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1868 1869
}

M
Minghao Li 已提交
1870 1871
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1872
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1873
  ASSERT(term == raftStoreGetTerm(pSyncNode));
1874 1875
  bool voted = raftStoreHasVoted(pSyncNode);
  ASSERT(!voted);
M
Minghao Li 已提交
1876

S
Shengliang Guan 已提交
1877
  raftStoreVote(pSyncNode, pRaftId);
M
Minghao Li 已提交
1878 1879
}

M
Minghao Li 已提交
1880
// simulate get vote from outside
1881 1882
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1883

S
Shengliang Guan 已提交
1884 1885
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1886
  if (ret != 0) return;
M
Minghao Li 已提交
1887

S
Shengliang Guan 已提交
1888
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1889 1890
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
1891
  pMsg->term = currentTerm;
M
Minghao Li 已提交
1892 1893 1894 1895
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1896
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1897 1898
}

M
Minghao Li 已提交
1899
// return if has a snapshot
M
Minghao Li 已提交
1900 1901
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1902
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1903 1904
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1905 1906 1907 1908 1909 1910 1911
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1912 1913
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1914
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1915
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1916 1917
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1918 1919 1920 1921 1922 1923 1924
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1925 1926
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1927 1928
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1929 1930
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1931
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1932 1933
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1934 1935
    }

M
Minghao Li 已提交
1936 1937 1938
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1939 1940 1941 1942
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1943
  } else {
M
Minghao Li 已提交
1944 1945
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1946
  }
M
Minghao Li 已提交
1947

M
Minghao Li 已提交
1948 1949 1950 1951 1952 1953 1954
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1955 1956
  return 0;
}
M
Minghao Li 已提交
1957

M
Minghao Li 已提交
1958
// return append-entries first try index
M
Minghao Li 已提交
1959 1960 1961 1962 1963
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1964 1965
// if index > 0, return index - 1
// else, return -1
1966 1967 1968 1969 1970 1971 1972 1973 1974
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1975 1976 1977 1978
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1979 1980 1981 1982 1983 1984 1985 1986 1987
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

1988 1989 1990
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

1991
  SSyncRaftEntry* pPreEntry = NULL;
1992 1993 1994 1995 1996 1997 1998
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

1999
    pSyncNode->pLogStore->cacheHit++;
2000 2001 2002
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
2003
    pSyncNode->pLogStore->cacheMiss++;
2004 2005 2006 2007
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
2008 2009 2010 2011 2012 2013

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2014
  if (code == 0) {
2015
    ASSERT(pPreEntry != NULL);
2016
    preTerm = pPreEntry->term;
2017 2018 2019 2020

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
2021
      syncEntryDestroy(pPreEntry);
2022 2023
    }

2024 2025
    return preTerm;
  } else {
2026 2027 2028 2029
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2030 2031 2032 2033
      }
    }
  }

2034
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
2035
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2036 2037
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2038 2039 2040 2041

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2042
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2043 2044 2045
  return 0;
}

M
Minghao Li 已提交
2046
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2047
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2048

S
Shengliang Guan 已提交
2049 2050 2051
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2052
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
2053 2054
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
2055
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
2056 2057
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
2058
    }
M
Minghao Li 已提交
2059

M
Minghao Li 已提交
2060
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
2061 2062
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
2063
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
2064 2065
      rpcFreeCont(rpcMsg.pCont);
      return;
2066
    }
M
Minghao Li 已提交
2067

S
Shengliang Guan 已提交
2068
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
2069
  }
M
Minghao Li 已提交
2070 2071
}

M
Minghao Li 已提交
2072
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2073
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2074

M
Minghao Li 已提交
2075 2076
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2077

2078
  if (pNode == NULL) return;
M
Minghao Li 已提交
2079 2080 2081 2082 2083

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2084

2085
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2086 2087 2088 2089
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2090

S
Shengliang Guan 已提交
2091
  SRpcMsg rpcMsg = {0};
2092 2093
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2094

S
Shengliang Guan 已提交
2095
  if (code != 0) {
M
Minghao Li 已提交
2096
    sError("failed to build elect msg");
M
Minghao Li 已提交
2097
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2098
    return;
M
Minghao Li 已提交
2099 2100
  }

S
Shengliang Guan 已提交
2101
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2102
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2103 2104 2105

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2106
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2107
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2108
    syncNodeRelease(pNode);
2109
    return;
M
Minghao Li 已提交
2110
  }
M
Minghao Li 已提交
2111 2112

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2113 2114
}

M
Minghao Li 已提交
2115
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2116
  if (!syncIsInit()) return;
2117

S
Shengliang Guan 已提交
2118
  SSyncNode* pNode = param;
C
cadem 已提交
2119
  if (pNode->totalReplicaNum > 1) {
S
Shengliang Guan 已提交
2120 2121
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2122
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2123 2124 2125
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2126
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2127
        return;
2128
      }
M
Minghao Li 已提交
2129

2130
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2131 2132
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2133
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2134 2135
        rpcFreeCont(rpcMsg.pCont);
        return;
2136
      }
S
Shengliang Guan 已提交
2137 2138 2139 2140

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2141
    } else {
S
Shengliang Guan 已提交
2142 2143
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2144
    }
M
Minghao Li 已提交
2145 2146 2147
  }
}

2148
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2149
  int64_t hbDataRid = (int64_t)param;
2150
  int64_t tsNow = taosGetTimestampMs();
2151

2152 2153
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
C
cadem 已提交
2154
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2155 2156
    return;
  }
2157

2158
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2159
  if (pSyncNode == NULL) {
2160
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2161
    sError("hb timer get pSyncNode NULL");
2162 2163 2164 2165 2166 2167 2168 2169
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2170
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2171 2172 2173
    return;
  }

M
Minghao Li 已提交
2174
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2175 2176
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2177
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2178 2179 2180
    return;
  }

C
cadem 已提交
2181
  sTrace("vgId:%d, eq peer hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid, pData->destId.addr);
2182

C
cadem 已提交
2183
  if (pSyncNode->totalReplicaNum > 1) {
M
Minghao Li 已提交
2184 2185 2186
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2187
    if (timerLogicClock == msgLogicClock) {
2188 2189 2190 2191 2192 2193
      if (tsNow > pData->execTime) {
        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

2194 2195
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

2196 2197 2198
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
2199
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
2200
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
2201
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
2202
        pSyncMsg->privateTerm = 0;
2203
        pSyncMsg->timeStamp = tsNow;
2204 2205 2206 2207 2208 2209

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
C
cadem 已提交
2210
        sTrace("vgId:%d, send heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
2211 2212
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2213 2214 2215
      } else {
      }

M
Minghao Li 已提交
2216 2217
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2218 2219
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2220 2221 2222 2223
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2224
    } else {
M
Minghao Li 已提交
2225 2226
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2227 2228
    }
  }
2229 2230 2231

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2232 2233
}

2234 2235 2236 2237
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
  (void)ud;
  taosMemoryFree(value);
}
2238

2239 2240 2241 2242
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2243 2244
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2245
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
2246
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2247 2248 2249 2250 2251 2252 2253
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2254
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
2255 2256 2257 2258 2259 2260 2261
  if (pEntry->dataLen < sizeof(SMsgHead)) {
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
    syncEntryDestroy(pEntry);
    return -1;
  }

B
Benguang Zhao 已提交
2262 2263
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2264
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
D
dapan1121 已提交
2265
    ASSERT(terrno != 0);
2266
    (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
2267
    syncEntryDestroy(pEntry);
B
Benguang Zhao 已提交
2268 2269 2270 2271
    return -1;
  }

  // proceed match index, with replicating on needed
2272
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2273

S
Shengliang Guan 已提交
2274
  sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2275 2276 2277
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2278

B
Benguang Zhao 已提交
2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2295
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
C
cadem 已提交
2296
  if (pSyncNode->totalReplicaNum == 1) {
2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2308
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2309 2310 2311 2312 2313 2314 2315 2316 2317
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2318 2319 2320
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
C
cadem 已提交
2321
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2337
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2338
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
2339
  SyncTerm  term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2340 2341 2342 2343 2344 2345 2346

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2347 2348
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2349 2350 2351
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2352 2353
  int32_t ret = 0;

2354
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
2355
  SyncTerm        term = raftStoreGetTerm(ths);
M
Minghao Li 已提交
2356
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2357
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2358

2359 2360
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2361
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2362
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
2363
    if (code != 0) {
M
Minghao Li 已提交
2364
      sError("append noop error");
2365 2366
      return -1;
    }
2367 2368

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2369 2370
  }

2371 2372 2373
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2374
    syncEntryDestroy(pEntry);
2375 2376
  }

M
Minghao Li 已提交
2377 2378 2379
  return ret;
}

S
Shengliang Guan 已提交
2380 2381
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2382
  bool           resetElect = false;
2383

M
Minghao Li 已提交
2384 2385 2386 2387
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2388
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2389
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2390
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2391

S
Shengliang Guan 已提交
2392 2393 2394 2395 2396
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
    sWarn(
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
        "cluster:%d",
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
2397 2398 2399
    return 0;
  }

2400 2401
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
2402
  SyncTerm currentTerm = raftStoreGetTerm(ths);
2403 2404

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2405 2406
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
2407
  pMsgReply->term = currentTerm;
2408
  pMsgReply->privateTerm = 8864;  // magic number
2409
  pMsgReply->startTime = ths->startTime;
2410
  pMsgReply->timeStamp = tsMs;
2411

2412 2413
  sTrace("vgId:%d, heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, ths->vgId,
         DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
C
cadem 已提交
2414

2415
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
D
dmchen 已提交
2416 2417 2418 2419
    raftStoreSetTerm(ths, pMsg->term);
    currentTerm = pMsg->term;
  }

2420
  if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2421
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
2422
    resetElect = true;
2423

M
Minghao Li 已提交
2424
    ths->minMatchIndex = pMsg->minMatchIndex;
2425

C
cadem 已提交
2426
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
S
Shengliang Guan 已提交
2427 2428 2429 2430
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2431 2432
      pSyncMsg->cmd =
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
2433 2434
      pSyncMsg->commitIndex = pMsg->commitIndex;
      pSyncMsg->currentTerm = pMsg->term;
2435 2436 2437 2438

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
2439
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
2440 2441
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2442 2443
          sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
                 pMsg->commitIndex, pMsg->term);
2444 2445
        }
      }
2446 2447 2448
    }
  }

C
cadem 已提交
2449
  if (pMsg->term >= currentTerm && ths->state == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
2450 2451 2452 2453
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2454
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
2455 2456
    pSyncMsg->currentTerm = pMsg->term;
    pSyncMsg->commitIndex = pMsg->commitIndex;
2457

S
Shengliang Guan 已提交
2458 2459
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2460 2461 2462 2463
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2464
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
2465
      }
2466 2467 2468 2469 2470
    }
  }

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
2471 2472

  if (resetElect) syncNodeResetElectTimer(ths);
2473 2474 2475
  return 0;
}

2476
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2477 2478 2479 2480
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2481
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2482
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2483 2484 2485 2486
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2487 2488

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2489
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2490

2491 2492
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2493
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
2494 2495
}

2496
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2497
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2498

M
Minghao Li 已提交
2499 2500 2501 2502
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2503
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2504
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2505
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2506

2507
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2508
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2509 2510 2511
  return 0;
}

S
Shengliang Guan 已提交
2512 2513
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2514 2515
  syncLogRecvLocalCmd(ths, pMsg, "");

2516
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
2517
    syncNodeStepDown(ths, pMsg->currentTerm);
2518

2519
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
2520 2521 2522 2523
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
      return 0;
    }
2524 2525 2526 2527
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
    if (pMsg->currentTerm == matchTerm) {
      (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
    }
2528
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
S
Shengliang Guan 已提交
2529
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
2530 2531
             ths->commitIndex);
    }
2532
  } else {
2533 2534 2535 2536 2537 2538
    sError("error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
2539 2540 2541 2542 2543 2544 2545 2546 2547 2548
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2549

2550
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2551
  sNTrace(ths, "on client request");
2552

B
Benguang Zhao 已提交
2553 2554
  int32_t code = 0;

B
Benguang Zhao 已提交
2555
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
2556
  SyncTerm        term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2557
  SSyncRaftEntry* pEntry = NULL;
2558 2559 2560 2561
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2562 2563
  }

2564 2565 2566 2567 2568
  if (pEntry == NULL) {
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
    return -1;
  }

B
Benguang Zhao 已提交
2569 2570 2571 2572 2573
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2574 2575
    int32_t code = syncNodeAppend(ths, pEntry);
    return code;
2576 2577 2578
  } else {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
2579
    return -1;
B
Benguang Zhao 已提交
2580 2581 2582
  }
}

S
Shengliang Guan 已提交
2583 2584 2585
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2586
      return "follower";
S
Shengliang Guan 已提交
2587
    case TAOS_SYNC_STATE_CANDIDATE:
2588
      return "candidate";
S
Shengliang Guan 已提交
2589
    case TAOS_SYNC_STATE_LEADER:
2590
      return "leader";
S
Shengliang Guan 已提交
2591
    case TAOS_SYNC_STATE_ERROR:
2592
      return "error";
S
Shengliang Guan 已提交
2593 2594
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
C
cadem 已提交
2595 2596
    case TAOS_SYNC_STATE_LEARNER:
      return "learner";
S
Shengliang Guan 已提交
2597 2598
    default:
      return "unknown";
S
Shengliang Guan 已提交
2599
  }
M
Minghao Li 已提交
2600
}
2601

2602
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
C
cadem 已提交
2603
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2604 2605 2606 2607
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
        .vgId = ths->vgId,
    };
2608 2609 2610 2611 2612 2613 2614 2615 2616 2617

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2618 2619 2620 2621
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

2622
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
C
cadem 已提交
2623
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2624 2625 2626 2627 2628
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2629 2630 2631 2632
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
C
cadem 已提交
2633
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
M
Minghao Li 已提交
2634 2635 2636 2637 2638
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2639
}
M
Minghao Li 已提交
2640

2641 2642
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
C
cadem 已提交
2643
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2644 2645 2646 2647 2648 2649 2650
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2651 2652
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
C
cadem 已提交
2653
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
M
Minghao Li 已提交
2654 2655 2656 2657 2658 2659 2660 2661 2662
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2663
  if (pState == NULL) {
2664
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2665 2666
    return false;
  }
M
Minghao Li 已提交
2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2692
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2693
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2694
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2695 2696 2697 2698 2699 2700
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2701
}