syncMain.c 83.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
C
cadem 已提交
40
#include "syncUtil.h"
M
Minghao Li 已提交
41

M
Minghao Li 已提交
42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
48 49 50
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58 59 60 61
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
64
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
65
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
66
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
67 68
    return -1;
  }
M
Minghao Li 已提交
69

S
Shengliang Guan 已提交
70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
71
  if (pSyncNode->rid < 0) {
72
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
73 74 75
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79 80 81
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
82
  return pSyncNode->rid;
M
Minghao Li 已提交
83
}
M
Minghao Li 已提交
84

B
Benguang Zhao 已提交
85
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
86
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
87
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
88
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
B
Benguang Zhao 已提交
89 90 91 92
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
93
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
94
    goto _err;
M
Minghao Li 已提交
95
  }
M
Minghao Li 已提交
96

B
Benguang Zhao 已提交
97 98 99 100
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
101

B
Benguang Zhao 已提交
102 103
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
104

105 106 107
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
108 109
}

M
Minghao Li 已提交
110
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
111
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
112
  if (pSyncNode != NULL) {
113
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
114
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
115
    syncNodeRemove(rid);
M
Minghao Li 已提交
116 117 118
  }
}

M
Minghao Li 已提交
119 120
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
121 122 123
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
124 125 126
  }
}

127 128 129 130 131 132 133 134
void syncPostStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodePostClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
}

S
Shengliang Guan 已提交
135 136 137
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
138 139
}

S
Shengliang Guan 已提交
140
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
141
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
142
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
143

M
Minghao Li 已提交
144
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
145
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
146
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
147
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
148
    return -1;
M
Minghao Li 已提交
149
  }
150

S
Shengliang Guan 已提交
151 152
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
153

M
Minghao Li 已提交
154 155 156 157
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
158
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
159 160 161
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
162
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
163
  }
S
Shengliang Guan 已提交
164

S
Shengliang Guan 已提交
165
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
166
  return 0;
M
Minghao Li 已提交
167
}
M
Minghao Li 已提交
168

S
Shengliang Guan 已提交
169 170 171 172
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
173
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
174 175
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
176 177 178 179 180 181 182 183 184 185
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
186 187 188
    case TDMT_SYNC_TIMEOUT_ELECTION:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
208
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
209 210 211 212
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
C
cadem 已提交
213 214 215
    case TDMT_SYNC_FORCE_FOLLOWER:
      code = syncForceBecomeFollower(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
216
    default:
217
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
218
      code = -1;
M
Minghao Li 已提交
219 220
  }

S
Shengliang Guan 已提交
221
  syncNodeRelease(pSyncNode);
222 223 224 225
  if (code != 0) {
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
           terrno);
  }
S
Shengliang Guan 已提交
226
  return code;
227 228
}

S
Shengliang Guan 已提交
229
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
230
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
231
  if (pSyncNode == NULL) return -1;
232

S
Shengliang Guan 已提交
233
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
234
  syncNodeRelease(pSyncNode);
235 236 237
  return ret;
}

C
cadem 已提交
238 239
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  syncNodeBecomeFollower(ths, "force election");
C
cadem 已提交
240

C
cadem 已提交
241 242 243 244 245 246 247 248 249
  SRpcMsg rsp = {
      .code = 0,
      .pCont = pRpcMsg->info.rsp,
      .contLen = pRpcMsg->info.rspLen,
      .info = pRpcMsg->info,
  };
  tmsgSendRsp(&rsp);

  return 0;
C
cadem 已提交
250 251
}

252
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
253
  SSyncNode* pNode = syncNodeAcquire(rid);
254
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
255 256

  SRpcMsg rpcMsg = {0};
257
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
258 259 260
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
261
  if (ret == 1) {
262
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
263
    rpcSendResponse(&rpcMsg);
264 265
    return 0;
  } else {
266
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
267
    return -1;
268
  }
S
Shengliang Guan 已提交
269 270
}

M
Minghao Li 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

287
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
288
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
289
  if (pSyncNode == NULL) {
290
    sError("sync begin snapshot error");
291 292
    return -1;
  }
293

294 295 296 297 298 299 300 301 302 303
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
    syncNodeRelease(pSyncNode);
    return 0;
  }

304
  int32_t code = 0;
305
  int64_t logRetention = 0;
306

M
Minghao Li 已提交
307
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
308
    // mnode
309
    logRetention = tsMndLogRetention;
M
Minghao Li 已提交
310 311 312 313
  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas
314 315 316
      logRetention = SYNC_VNODE_LOG_RETENTION;
    }
  }
M
Minghao Li 已提交
317

318 319 320 321 322 323
  if (pSyncNode->replicaNum > 1) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
              lastApplyIndex);
      syncNodeRelease(pSyncNode);
      return 0;
324
    }
325
    logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
326 327
  }

M
Minghao Li 已提交
328
_DEL_WAL:
329

M
Minghao Li 已提交
330
  do {
331 332 333 334 335 336 337 338 339 340 341
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

342
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
343 344 345 346 347 348 349 350
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
351

M
Minghao Li 已提交
352
      } else {
353 354
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
355
      }
356
    }
M
Minghao Li 已提交
357
  } while (0);
358

S
Shengliang Guan 已提交
359
  syncNodeRelease(pSyncNode);
360 361 362 363
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
364
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
365
  if (pSyncNode == NULL) {
366
    sError("sync end snapshot error");
367 368 369
    return -1;
  }

370 371 372 373
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
374
    if (code != 0) {
375
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
376
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
377 378
      return -1;
    } else {
S
Shengliang Guan 已提交
379
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
380 381
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
382
  }
383

S
Shengliang Guan 已提交
384
  syncNodeRelease(pSyncNode);
385 386 387
  return code;
}

M
Minghao Li 已提交
388
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
389
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
390
  if (pSyncNode == NULL) {
391
    sError("sync step down error");
M
Minghao Li 已提交
392 393 394
    return -1;
  }

M
Minghao Li 已提交
395
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
396
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
397
  return 0;
M
Minghao Li 已提交
398 399
}

400
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
401
  if (pSyncNode == NULL) {
402
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
403
    sError("sync ready for read error");
404 405
    return false;
  }
M
Minghao Li 已提交
406

407 408 409 410 411
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

412
  if (!pSyncNode->restoreFinish) {
413
    terrno = TSDB_CODE_SYN_RESTORING;
414
    return false;
415
  }
416

417
  return true;
418 419 420 421 422 423 424 425 426 427 428
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

429 430
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
431
}
M
Minghao Li 已提交
432

433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
455 456
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
457
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
458 459
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
460
  }
M
Minghao Li 已提交
461

462
  int32_t ret = 0;
463
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
464
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
465 466 467 468 469 470 471
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
472 473 474
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
475
  return ret;
M
Minghao Li 已提交
476 477
}

M
Minghao Li 已提交
478 479
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
480
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
481 482
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
483
  }
484

S
Shengliang Guan 已提交
485
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
486

487 488 489 490
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
491
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
M
Minghao Li 已提交
492 493 494
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
495
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
496 497
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
498 499
}

500 501
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
502

S
Shengliang Guan 已提交
503
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
504 505 506
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
507 508 509 510 511
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
512
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
513 514
  }

515
  return state;
M
Minghao Li 已提交
516 517
}

518
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
519 520
  ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
521

522 523 524 525
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
526 527
    }
  }
S
Shengliang Guan 已提交
528
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
529
         snapshotLastApplyIndex, lastIndex);
530 531 532 533

  return lastIndex;
}

534 535
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
536

S
Shengliang Guan 已提交
537
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
538
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
539

540
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
541
    SEp* pEp = &pEpSet->eps[i];
542 543
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
S
Shengliang Guan 已提交
544
    pEpSet->numOfEps++;
545
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
546
  }
M
Minghao Li 已提交
547
  if (pEpSet->numOfEps > 0) {
548
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
549 550
  }

S
Shengliang Guan 已提交
551
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
552
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
553 554
}

S
Shengliang Guan 已提交
555
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
556
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
557
  if (pSyncNode == NULL) {
558
    sError("sync propose error");
M
Minghao Li 已提交
559
    return -1;
560
  }
561

S
Shengliang Guan 已提交
562
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
563
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
564 565
  return ret;
}
M
Minghao Li 已提交
566

S
Shengliang Guan 已提交
567
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
568 569
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
570
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
571 572
    return -1;
  }
573

S
Shengliang Guan 已提交
574 575 576 577 578 579 580
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
581

582
  // heartbeat timeout
583
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
584 585 586 587 588 589
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
590 591 592
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
593
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
594 595
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
596
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
597 598 599
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
600
    } else {
S
Shengliang Guan 已提交
601
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
602
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
603
             TMSG_INFO(pMsg->msgType));
604
      return -1;
605
    }
S
Shengliang Guan 已提交
606
  } else {
S
Shengliang Guan 已提交
607 608
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
609
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
610
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
611 612 613 614
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
615
    }
616

617 618 619
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
620
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
621
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
622
    }
M
Minghao Li 已提交
623

S
Shengliang Guan 已提交
624
    if (seq != NULL) *seq = seqNum;
625
    return code;
M
Minghao Li 已提交
626
  }
M
Minghao Li 已提交
627 628
}

S
Shengliang Guan 已提交
629
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
630 631 632 633 634
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
635
  pSyncTimer->timeStamp = taosGetTimestampMs();
636 637 638 639
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
640
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
641
  int32_t ret = 0;
S
Shengliang Guan 已提交
642
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
643
  if (syncIsInit()) {
644 645 646 647 648 649
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
650
    pSyncTimer->timeStamp = tsNow;
651 652

    pData->syncNodeRid = pSyncNode->rid;
653 654 655
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
656
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
657

658 659
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
660 661 662 663 664 665
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
666
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
667 668 669 670
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
671 672
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
673 674 675
  return ret;
}

676
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
677 678 679
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
680 681 682
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

683 684 685 686 687
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
S
Shengliang Guan 已提交
688
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
689 690 691 692 693 694 695
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
696
// open/close --------------
S
Shengliang Guan 已提交
697 698
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
699 700 701 702
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
703

M
Minghao Li 已提交
704 705 706 707
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
708
      goto _error;
M
Minghao Li 已提交
709
    }
710
  }
M
Minghao Li 已提交
711

712 713 714
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
S
Shengliang Guan 已提交
715
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
716

717
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
718
    // create a new raft config file
719
    sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
720 721 722 723 724 725 726 727 728 729
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
    pSyncNode->raftCfg.lastConfigIndex = SYNC_INDEX_INVALID;
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
    pSyncNode->raftCfg.configIndexCount = 1;
    pSyncNode->raftCfg.configIndexArr[0] = -1;

    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
730
      goto _error;
731
    }
732 733
  } else {
    // update syncCfg by raft_config.json
734 735
    if (syncReadCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
736
      goto _error;
737
    }
S
Shengliang Guan 已提交
738

739
    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
740
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
741 742 743
      pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
      if (syncWriteCfgFile(pSyncNode) != 0) {
        sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
S
Shengliang Guan 已提交
744 745
        goto _error;
      }
S
Shengliang Guan 已提交
746
    } else {
747 748
      sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
749
    }
M
Minghao Li 已提交
750 751
  }

M
Minghao Li 已提交
752
  // init by SSyncInfo
M
Minghao Li 已提交
753
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
754
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
755
  bool      updated = false;
S
Shengliang Guan 已提交
756
  sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
S
Shengliang Guan 已提交
757 758
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
759 760 761
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
      updated = true;
    }
762 763
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
764 765
  }

766 767 768 769 770 771 772 773
  if (updated) {
    sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
      goto _error;
    }
  }

M
Minghao Li 已提交
774
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
775
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
776 777 778
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
779

B
Benguang Zhao 已提交
780 781 782
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
783
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
784 785 786
    goto _error;
  }

M
Minghao Li 已提交
787
  // init internal
788
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
789
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
790
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
791
    goto _error;
792
  }
M
Minghao Li 已提交
793

M
Minghao Li 已提交
794
  // init peersNum, peers, peersId
795
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.replicaNum - 1;
S
Shengliang Guan 已提交
796
  int32_t j = 0;
797 798 799 800
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
801 802 803
      j++;
    }
  }
S
Shengliang Guan 已提交
804
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
805
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
806
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
807
      goto _error;
808
    }
M
Minghao Li 已提交
809
  }
M
Minghao Li 已提交
810

M
Minghao Li 已提交
811
  // init replicaNum, replicasId
812 813 814
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
815
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
816
      goto _error;
817
    }
M
Minghao Li 已提交
818 819
  }

M
Minghao Li 已提交
820
  // init raft algorithm
M
Minghao Li 已提交
821
  pSyncNode->pFsm = pSyncInfo->pFsm;
822
  pSyncInfo->pFsm = NULL;
823
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
M
Minghao Li 已提交
824 825
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
826
  // init life cycle outside
M
Minghao Li 已提交
827

M
Minghao Li 已提交
828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
852
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
853
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
854
  if (raftStoreOpen(pSyncNode) != 0) {
S
Shengliang Guan 已提交
855
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
856 857
    goto _error;
  }
M
Minghao Li 已提交
858

M
Minghao Li 已提交
859
  // init TLA+ candidate vars
M
Minghao Li 已提交
860
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
861
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
862
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
863 864
    goto _error;
  }
M
Minghao Li 已提交
865
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
866
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
867
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
868 869
    goto _error;
  }
M
Minghao Li 已提交
870

M
Minghao Li 已提交
871 872
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
873
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
874
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
875 876
    goto _error;
  }
M
Minghao Li 已提交
877
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
878
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
879
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
880 881
    goto _error;
  }
M
Minghao Li 已提交
882 883 884

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
885
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
886
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
887 888
    goto _error;
  }
889 890 891 892

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
893
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
894 895
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
896
      sNTrace(pSyncNode, "reset commit index by snapshot");
897 898 899
    }
  }
  pSyncNode->commitIndex = commitIndex;
900
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
M
Minghao Li 已提交
901

902
  // restore log store on need
903
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
904
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
905 906
    goto _error;
  }
907

M
Minghao Li 已提交
908 909
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
910 911
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
912

M
Minghao Li 已提交
913
  // init ping timer
M
Minghao Li 已提交
914
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
915
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
916 917
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
918
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
919
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
920

M
Minghao Li 已提交
921 922
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
923
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
924
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
925
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
926 927 928 929
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
930
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
931 932
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
933
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
934 935
  pSyncNode->heartbeatTimerCounter = 0;

936 937 938 939 940
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
941
  // tools
M
Minghao Li 已提交
942
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
943
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
944
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
945 946
    goto _error;
  }
M
Minghao Li 已提交
947

948 949
  // restore state
  pSyncNode->restoreFinish = false;
950

M
Minghao Li 已提交
951
  // snapshot senders
S
Shengliang Guan 已提交
952
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
953
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
954 955 956 957
    if (pSender == NULL) return NULL;

    pSyncNode->senders[i] = pSender;
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
M
Minghao Li 已提交
958 959 960
  }

  // snapshot receivers
961
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
962 963 964
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
          pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
965

M
Minghao Li 已提交
966 967 968
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
969
  // replication mgr
970
  if (syncNodeLogReplInit(pSyncNode) < 0) {
971 972 973
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
B
Benguang Zhao 已提交
974

M
Minghao Li 已提交
975
  // peer state
976 977 978 979
  if (syncNodePeerStateInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
M
Minghao Li 已提交
980

B
Benguang Zhao 已提交
981
  //
M
Minghao Li 已提交
982 983 984
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
985
  // start in syncNodeStart
M
Minghao Li 已提交
986
  // start raft
M
Minghao Li 已提交
987
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
988

M
Minghao Li 已提交
989 990
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
991
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
992 993
  pSyncNode->lastReplicateTime = timeNow;

994 995 996
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
997 998
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
999
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1000
    goto _error;
B
Benguang Zhao 已提交
1001 1002
  }

1003
  pSyncNode->isStart = true;
1004 1005 1006
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1007 1008
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1009
  pSyncNode->tmrRoutineNum = 0;
1010

1011 1012
  sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
M
Minghao Li 已提交
1013
  return pSyncNode;
1014 1015 1016

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1017 1018
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1019 1020 1021 1022
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1023 1024
}

M
Minghao Li 已提交
1025 1026
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1027 1028
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1029 1030 1031 1032 1033 1034
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1035
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1036 1037
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1038 1039 1040 1041

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1042 1043
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
S
Shengliang Guan 已提交
1044
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
1045 1046 1047
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1048

1049
  ASSERT(endIndex == lastVer + 1);
1050 1051
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
B
Benguang Zhao 已提交
1052

1053
  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
B
Benguang Zhao 已提交
1054 1055 1056 1057 1058 1059 1060 1061 1062
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
1063
    raftStoreNextTerm(pSyncNode);
B
Benguang Zhao 已提交
1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
    syncNodeBecomeLeader(pSyncNode, "one replica start");

    // Raft 3.6.2 Committing entries from previous terms
    syncNodeAppendNoop(pSyncNode);
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1074 1075
  if (ret != 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1076
  }
1077
  return ret;
M
Minghao Li 已提交
1078 1079
}

B
Benguang Zhao 已提交
1080
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1081 1082 1083 1084 1085 1086 1087
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1088 1089 1090 1091
  if (ret < 0) {
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
1092

1093
  ret = syncNodeStartPingTimer(pSyncNode);
1094 1095 1096 1097
  if (ret < 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
B
Benguang Zhao 已提交
1098
  return ret;
M
Minghao Li 已提交
1099 1100
}

M
Minghao Li 已提交
1101
void syncNodePreClose(SSyncNode* pSyncNode) {
1102 1103 1104 1105
  ASSERT(pSyncNode != NULL);
  ASSERT(pSyncNode->pFsm != NULL);
  ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);

M
Minghao Li 已提交
1106 1107 1108 1109 1110
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
1111

1112 1113 1114
  // stop ping timer
  syncNodeStopPingTimer(pSyncNode);

1115 1116
  // clean rsp
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
1117 1118
}

1119 1120 1121
void syncNodePostClose(SSyncNode* pSyncNode) {
  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1122
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1123 1124 1125 1126 1127 1128 1129
    }

    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }
M
Minghao Li 已提交
1130 1131
}

1132
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1133

M
Minghao Li 已提交
1134
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1135
  if (pSyncNode == NULL) return;
1136
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1137

1138 1139
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);

1140 1141 1142
  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);
1143
  syncNodeLogReplDestroy(pSyncNode);
1144

M
Minghao Li 已提交
1145
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1146
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1147
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1148
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1149
  votesRespondDestory(pSyncNode->pVotesRespond);
1150
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1151
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1152
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1153
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1154
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1155
  logStoreDestory(pSyncNode->pLogStore);
1156
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1157 1158
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1159

S
Shengliang Guan 已提交
1160
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1161 1162
    if (pSyncNode->senders[i] != NULL) {
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
1163

1164 1165
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
        snapshotSenderStop(pSyncNode->senders[i], false);
1166 1167
      }

1168 1169
      snapshotSenderDestroy(pSyncNode->senders[i]);
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1170 1171 1172
    }
  }

M
Minghao Li 已提交
1173
  if (pSyncNode->pNewNodeReceiver != NULL) {
1174
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1175
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1176 1177
    }

1178
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1179 1180 1181 1182
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1183 1184 1185 1186
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

1187 1188
  raftStoreClose(pSyncNode);

1189
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1190 1191
}

1192
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
M
Minghao Li 已提交
1193

M
Minghao Li 已提交
1194 1195 1196
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1197 1198
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1199 1200 1201
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1202
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1203
  }
M
Minghao Li 已提交
1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1217
  if (syncIsInit()) {
1218
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1219

1220 1221 1222 1223 1224
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1225

M
Minghao Li 已提交
1226
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1227
                 &pSyncNode->pElectTimer);
1228

1229
  } else {
M
Minghao Li 已提交
1230
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1231
  }
M
Minghao Li 已提交
1232 1233 1234 1235 1236
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1237
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1238 1239
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1240

M
Minghao Li 已提交
1241 1242 1243 1244 1245 1246 1247 1248 1249 1250
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

1251
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1252 1253
  int32_t electMS;

1254
  if (pSyncNode->raftCfg.isStandBy) {
M
Minghao Li 已提交
1255 1256 1257 1258
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
1259 1260

  (void)syncNodeRestartElectTimer(pSyncNode, electMS);
1261

S
Shengliang Guan 已提交
1262 1263
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1264 1265
}

M
Minghao Li 已提交
1266
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1267
  int32_t ret = 0;
S
Shengliang Guan 已提交
1268 1269
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1270 1271 1272
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1273
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1274
  }
1275

S
Shengliang Guan 已提交
1276
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1277 1278 1279
  return ret;
}

M
Minghao Li 已提交
1280
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1281
  int32_t ret = 0;
M
Minghao Li 已提交
1282

1283
#if 0
M
Minghao Li 已提交
1284
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1285 1286
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1287

S
Shengliang Guan 已提交
1288
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1289
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1290 1291 1292
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1293
  }
1294

M
Minghao Li 已提交
1295 1296 1297
  return ret;
}

M
Minghao Li 已提交
1298 1299
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1300 1301

#if 0
M
Minghao Li 已提交
1302 1303 1304
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1305
#endif
1306

S
Shengliang Guan 已提交
1307
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1308
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1309 1310 1311
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1312
  }
1313

M
Minghao Li 已提交
1314 1315 1316
  return ret;
}

1317 1318 1319 1320 1321 1322
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

1323 1324 1325 1326 1327 1328 1329 1330
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
  SEpSet* epSet = NULL;
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
    if (destRaftId->addr == pNode->peersId[i].addr) {
      epSet = &pNode->peersEpset[i];
      break;
    }
  }
1331

S
Shengliang Guan 已提交
1332
  int32_t code = -1;
1333
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
M
Minghao Li 已提交
1334
    syncUtilMsgHtoN(pMsg->pCont);
1335
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1336 1337 1338 1339 1340 1341
    code = pNode->syncSendMSg(epSet, pMsg);
  }

  if (code < 0) {
    sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet,
           DID(destRaftId), destRaftId->addr, terrno);
S
Shengliang Guan 已提交
1342
    rpcFreeCont(pMsg->pCont);
1343
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
1344
  }
S
Shengliang Guan 已提交
1345 1346

  return code;
M
Minghao Li 已提交
1347 1348
}

1349
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
1350 1351 1352
  bool b1 = false;
  bool b2 = false;

1353 1354 1355
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
1356 1357 1358 1359 1360
      b1 = true;
      break;
    }
  }

1361 1362 1363 1364 1365
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
        .vgId = pNode->vgId,
    };
1366

1367
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
1368 1369 1370 1371 1372
      b2 = true;
      break;
    }
  }

1373
  ASSERT(b1 == b2);
1374 1375 1376
  return b1;
}

1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1390
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1391
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1392 1393 1394 1395
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1396

1397 1398
  pSyncNode->raftCfg.cfg = *pNewConfig;
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
1399

1400 1401
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1402 1403
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1404

M
Minghao Li 已提交
1405 1406
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1407

M
Minghao Li 已提交
1408 1409 1410 1411
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1412
  }
1413

M
Minghao Li 已提交
1414 1415 1416 1417 1418
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1419

M
Minghao Li 已提交
1420
  // log begin config change
1421
  sNInfo(pSyncNode, "begin do config change, from %d to %d, replicas:%d", pSyncNode->vgId, oldConfig.replicaNum,
1422
         pNewConfig->replicaNum);
M
Minghao Li 已提交
1423

M
Minghao Li 已提交
1424
  if (IamInNew) {
1425
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1426
  }
M
Minghao Li 已提交
1427
  if (isDrop) {
1428
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
M
Minghao Li 已提交
1429 1430
  }

M
Minghao Li 已提交
1431
  // add last config index
1432
  syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
M
Minghao Li 已提交
1433

M
Minghao Li 已提交
1434 1435 1436 1437 1438 1439 1440 1441 1442
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1443
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1444
      oldSenders[i] = pSyncNode->senders[i];
S
Shengliang Guan 已提交
1445
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1446
    }
1447

M
Minghao Li 已提交
1448
    // init internal
1449
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
1450
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1451 1452

    // init peersNum, peers, peersId
1453
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1454
    int32_t j = 0;
1455 1456 1457 1458
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
1459 1460 1461
        j++;
      }
    }
S
Shengliang Guan 已提交
1462
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1463
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1464
    }
1465

M
Minghao Li 已提交
1466
    // init replicaNum, replicasId
1467 1468 1469
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
      syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1470
    }
1471

1472
    // update quorum first
1473
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
1474

M
Minghao Li 已提交
1475 1476 1477 1478
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1479

M
Minghao Li 已提交
1480
    // reset snapshot senders
1481

M
Minghao Li 已提交
1482
    // clear new
S
Shengliang Guan 已提交
1483
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1484
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1485
    }
M
Minghao Li 已提交
1486

M
Minghao Li 已提交
1487
    // reset new
S
Shengliang Guan 已提交
1488
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1489 1490
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1491
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1492
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
1493 1494
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
M
Minghao Li 已提交
1495

1496
          pSyncNode->senders[i] = oldSenders[j];
M
Minghao Li 已提交
1497 1498 1499 1500
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
1501 1502
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
          pSyncNode->senders[i]->replicaIndex = i;
M
Minghao Li 已提交
1503

1504 1505
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
M
Minghao Li 已提交
1506 1507

          break;
M
Minghao Li 已提交
1508
        }
1509 1510
      }
    }
1511

M
Minghao Li 已提交
1512
    // create new
S
Shengliang Guan 已提交
1513
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1514 1515 1516 1517 1518 1519 1520 1521
      if (pSyncNode->senders[i] == NULL) {
        pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
        if (pSyncNode->senders[i] == NULL) {
          // will be created later while send snapshot
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
        } else {
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
        }
S
Shengliang Guan 已提交
1522
      } else {
1523
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
M
Minghao Li 已提交
1524
      }
1525 1526
    }

M
Minghao Li 已提交
1527
    // free old
S
Shengliang Guan 已提交
1528
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1529
      if (oldSenders[i] != NULL) {
1530
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1531 1532 1533
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1534 1535
    }

1536
    // persist cfg
1537
    syncWriteCfgFile(pSyncNode);
1538

M
Minghao Li 已提交
1539 1540
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
1541
      syncNodeBecomeLeader(pSyncNode, "");
1542 1543 1544

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1545
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1546

M
Minghao Li 已提交
1547
    } else {
1548
      syncNodeBecomeFollower(pSyncNode, "");
M
Minghao Li 已提交
1549 1550
    }
  } else {
1551
    // persist cfg
1552 1553
    syncWriteCfgFile(pSyncNode);
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.replicaNum, pNewConfig->replicaNum);
1554
  }
1555

M
Minghao Li 已提交
1556
_END:
M
Minghao Li 已提交
1557
  // log end config change
S
Shengliang Guan 已提交
1558
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.replicaNum, pNewConfig->replicaNum);
M
Minghao Li 已提交
1559 1560
}

M
Minghao Li 已提交
1561 1562
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
1563
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1564
    raftStoreSetTerm(pSyncNode, term);
1565
    char tmpBuf[64];
1566
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1567
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1568
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1569 1570 1571
  }
}

1572
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
1573
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1574
    raftStoreSetTerm(pSyncNode, term);
1575 1576 1577
  }
}

M
Minghao Li 已提交
1578
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
1579 1580 1581
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
  if (currentTerm > newTerm) {
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1582 1583
    return;
  }
M
Minghao Li 已提交
1584 1585

  do {
1586
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1587 1588
  } while (0);

1589
  if (currentTerm < newTerm) {
S
Shengliang Guan 已提交
1590
    raftStoreSetTerm(pSyncNode, newTerm);
M
Minghao Li 已提交
1591
    char tmpBuf[64];
1592
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1593
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1594
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1595 1596 1597 1598 1599 1600 1601 1602

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1603 1604
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1605
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1606
  // maybe clear leader cache
M
Minghao Li 已提交
1607 1608 1609 1610
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1611 1612
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1613
  // state change
M
Minghao Li 已提交
1614 1615 1616
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

1617 1618
  // trace log
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1619

1620 1621 1622
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1623 1624 1625 1626 1627
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1628 1629 1630
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1631 1632 1633
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

1634 1635
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1656
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1657 1658
  pSyncNode->leaderTime = taosGetTimestampMs();

1659
  pSyncNode->becomeLeaderNum++;
1660
  pSyncNode->hbrSlowNum = 0;
1661

1662 1663 1664
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1665
  // state change
M
Minghao Li 已提交
1666
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1667 1668

  // set leader cache
M
Minghao Li 已提交
1669 1670
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1671
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
1672 1673 1674
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1675
    ASSERT(code == 0);
1676
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1677 1678
  }

S
Shengliang Guan 已提交
1679
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1680 1681
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1682 1683 1684
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1685 1686 1687
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1688
#if 0
1689 1690
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1691
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1692
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1693 1694
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
1695
      }
1696
    }
1697
    (pMySender->privateTerm) += 100;
1698
  }
M
Minghao Li 已提交
1699
#endif
1700

1701
  // close receiver
1702
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1703
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1704 1705
  }

M
Minghao Li 已提交
1706
  // stop elect timer
M
Minghao Li 已提交
1707
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1708

M
Minghao Li 已提交
1709 1710
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1711

M
Minghao Li 已提交
1712 1713
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1714

1715 1716 1717 1718 1719
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1720 1721 1722
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1723 1724 1725
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1726
  // trace log
1727
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1728 1729 1730
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1731
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1732 1733 1734 1735 1736
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
  if (!granted) {
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
    return;
  }
1737
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1738

S
Shengliang Guan 已提交
1739
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1740

B
Benguang Zhao 已提交
1741
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1742 1743 1744 1745
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1746
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1747
  ASSERT(lastIndex >= 0);
1748 1749
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1750 1751
}

M
Minghao Li 已提交
1752 1753
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1754
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1755
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1756 1757 1758 1759 1760
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1761 1762 1763
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1764
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1765
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1766
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1767
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1768
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1769

S
Shengliang Guan 已提交
1770
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1771 1772 1773
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1774
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1775
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1776
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1777
  sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1778
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1779

S
Shengliang Guan 已提交
1780
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1781 1782 1783
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1784
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1785
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1786
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1787
  sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1788
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1789

S
Shengliang Guan 已提交
1790
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1791 1792
}

M
Minghao Li 已提交
1793 1794
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1795
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1796
  ASSERT(term == raftStoreGetTerm(pSyncNode));
1797 1798
  bool voted = raftStoreHasVoted(pSyncNode);
  ASSERT(!voted);
M
Minghao Li 已提交
1799

S
Shengliang Guan 已提交
1800
  raftStoreVote(pSyncNode, pRaftId);
M
Minghao Li 已提交
1801 1802
}

M
Minghao Li 已提交
1803
// simulate get vote from outside
1804 1805
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1806

S
Shengliang Guan 已提交
1807 1808
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1809
  if (ret != 0) return;
M
Minghao Li 已提交
1810

S
Shengliang Guan 已提交
1811
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1812 1813
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
1814
  pMsg->term = currentTerm;
M
Minghao Li 已提交
1815 1816 1817 1818
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1819
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1820 1821
}

M
Minghao Li 已提交
1822
// return if has a snapshot
M
Minghao Li 已提交
1823 1824
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1825
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1826 1827
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1828 1829 1830 1831 1832 1833 1834
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1835 1836
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1837
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1838
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1839 1840
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1841 1842 1843 1844 1845 1846 1847
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1848 1849
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1850 1851
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1852 1853
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1854
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1855 1856
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1857 1858
    }

M
Minghao Li 已提交
1859 1860 1861
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1862 1863 1864 1865
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1866
  } else {
M
Minghao Li 已提交
1867 1868
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1869
  }
M
Minghao Li 已提交
1870

M
Minghao Li 已提交
1871 1872 1873 1874 1875 1876 1877
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1878 1879
  return 0;
}
M
Minghao Li 已提交
1880

M
Minghao Li 已提交
1881
// return append-entries first try index
M
Minghao Li 已提交
1882 1883 1884 1885 1886
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1887 1888
// if index > 0, return index - 1
// else, return -1
1889 1890 1891 1892 1893 1894 1895 1896 1897
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1898 1899 1900 1901
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1902 1903 1904 1905 1906 1907 1908 1909 1910
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

1911 1912 1913
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

1914
  SSyncRaftEntry* pPreEntry = NULL;
1915 1916 1917 1918 1919 1920 1921
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

1922
    pSyncNode->pLogStore->cacheHit++;
1923 1924 1925
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
1926
    pSyncNode->pLogStore->cacheMiss++;
1927 1928 1929 1930
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
1931 1932 1933 1934 1935 1936

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

1937
  if (code == 0) {
1938
    ASSERT(pPreEntry != NULL);
1939
    preTerm = pPreEntry->term;
1940 1941 1942 1943

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
1944
      syncEntryDestroy(pPreEntry);
1945 1946
    }

1947 1948
    return preTerm;
  } else {
1949 1950 1951 1952
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
1953 1954 1955 1956
      }
    }
  }

1957
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
1958
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1959 1960
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
1961 1962 1963 1964

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1965
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1966 1967 1968
  return 0;
}

M
Minghao Li 已提交
1969
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1970
  if (!syncIsInit()) return;
M
Minghao Li 已提交
1971

S
Shengliang Guan 已提交
1972 1973 1974
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1975
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
1976 1977
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
1978
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
1979 1980
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
1981
    }
M
Minghao Li 已提交
1982

M
Minghao Li 已提交
1983
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
1984 1985
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
1986
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
1987 1988
      rpcFreeCont(rpcMsg.pCont);
      return;
1989
    }
M
Minghao Li 已提交
1990

S
Shengliang Guan 已提交
1991
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
1992
  }
M
Minghao Li 已提交
1993 1994
}

M
Minghao Li 已提交
1995
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1996
  if (!syncIsInit()) return;
M
Minghao Li 已提交
1997

M
Minghao Li 已提交
1998 1999
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2000

2001
  if (pNode == NULL) return;
M
Minghao Li 已提交
2002 2003 2004 2005 2006

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2007

2008
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2009 2010 2011 2012
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2013

S
Shengliang Guan 已提交
2014
  SRpcMsg rpcMsg = {0};
2015 2016
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2017

S
Shengliang Guan 已提交
2018
  if (code != 0) {
M
Minghao Li 已提交
2019
    sError("failed to build elect msg");
M
Minghao Li 已提交
2020
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2021
    return;
M
Minghao Li 已提交
2022 2023
  }

S
Shengliang Guan 已提交
2024
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2025
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2026 2027 2028

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2029
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2030
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2031
    syncNodeRelease(pNode);
2032
    return;
M
Minghao Li 已提交
2033
  }
M
Minghao Li 已提交
2034 2035

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2036 2037
}

M
Minghao Li 已提交
2038
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2039
  if (!syncIsInit()) return;
2040

S
Shengliang Guan 已提交
2041 2042 2043 2044
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2045
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2046 2047 2048
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2049
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2050
        return;
2051
      }
M
Minghao Li 已提交
2052

2053
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2054 2055
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2056
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2057 2058
        rpcFreeCont(rpcMsg.pCont);
        return;
2059
      }
S
Shengliang Guan 已提交
2060 2061 2062 2063

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2064
    } else {
S
Shengliang Guan 已提交
2065 2066
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2067
    }
M
Minghao Li 已提交
2068 2069 2070
  }
}

2071
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2072
  int64_t hbDataRid = (int64_t)param;
2073
  int64_t tsNow = taosGetTimestampMs();
2074

2075 2076
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
M
Minghao Li 已提交
2077
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2078 2079
    return;
  }
2080

2081
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2082
  if (pSyncNode == NULL) {
2083
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2084
    sError("hb timer get pSyncNode NULL");
2085 2086 2087 2088 2089 2090 2091 2092
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2093
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2094 2095 2096
    return;
  }

M
Minghao Li 已提交
2097
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2098 2099
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2100
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2101 2102 2103
    return;
  }

M
Minghao Li 已提交
2104
  // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
2105 2106

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2107 2108 2109
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2110
    if (timerLogicClock == msgLogicClock) {
2111 2112 2113 2114 2115 2116
      if (tsNow > pData->execTime) {
        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

2117 2118
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

2119 2120 2121
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
2122
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
2123
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
2124
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
2125
        pSyncMsg->privateTerm = 0;
2126
        pSyncMsg->timeStamp = tsNow;
2127 2128 2129 2130 2131 2132

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
2133 2134
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2135 2136 2137
      } else {
      }

M
Minghao Li 已提交
2138 2139
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2140 2141
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2142 2143 2144 2145
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2146
    } else {
M
Minghao Li 已提交
2147 2148
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2149 2150
    }
  }
2151 2152 2153

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2154 2155
}

2156 2157
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2158 2159 2160 2161
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2162 2163
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2164 2165 2166 2167 2168 2169 2170 2171 2172
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2173
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
2174 2175 2176 2177 2178 2179 2180
  if (pEntry->dataLen < sizeof(SMsgHead)) {
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
    syncEntryDestroy(pEntry);
    return -1;
  }

B
Benguang Zhao 已提交
2181 2182
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2183
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
D
dapan1121 已提交
2184
    ASSERT(terrno != 0);
2185
    (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
2186
    syncEntryDestroy(pEntry);
B
Benguang Zhao 已提交
2187 2188 2189 2190
    return -1;
  }

  // proceed match index, with replicating on needed
2191
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2192

S
Shengliang Guan 已提交
2193
  sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2194 2195 2196
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2197

B
Benguang Zhao 已提交
2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2214
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226
  if (pSyncNode->replicaNum == 1) {
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2227
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2228 2229 2230 2231 2232 2233 2234 2235 2236
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2256
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2257
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
2258
  SyncTerm  term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2259 2260 2261 2262 2263 2264 2265

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2266 2267
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2268 2269 2270
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2271 2272
  int32_t ret = 0;

2273
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
2274
  SyncTerm        term = raftStoreGetTerm(ths);
M
Minghao Li 已提交
2275
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2276
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2277

2278 2279
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2280
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2281
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
2282
    if (code != 0) {
M
Minghao Li 已提交
2283
      sError("append noop error");
2284 2285
      return -1;
    }
2286 2287

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2288 2289
  }

2290 2291 2292
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2293
    syncEntryDestroy(pEntry);
2294 2295
  }

M
Minghao Li 已提交
2296 2297 2298
  return ret;
}

S
Shengliang Guan 已提交
2299 2300
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2301
  bool           resetElect = false;
2302

M
Minghao Li 已提交
2303 2304 2305 2306
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2307
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2308
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2309
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2310

S
Shengliang Guan 已提交
2311 2312 2313 2314 2315
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
    sWarn(
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
        "cluster:%d",
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
2316 2317 2318
    return 0;
  }

2319 2320
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
2321
  SyncTerm currentTerm = raftStoreGetTerm(ths);
2322 2323

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2324 2325
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
2326
  pMsgReply->term = currentTerm;
2327
  pMsgReply->privateTerm = 8864;  // magic number
2328
  pMsgReply->startTime = ths->startTime;
2329
  pMsgReply->timeStamp = tsMs;
2330

2331
  if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2332
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
2333
    resetElect = true;
2334

M
Minghao Li 已提交
2335
    ths->minMatchIndex = pMsg->minMatchIndex;
2336 2337

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2338 2339 2340 2341
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2342
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
2343 2344 2345
      pSyncMsg->commitIndex = pMsg->commitIndex;
      pSyncMsg->currentTerm = pMsg->term;
      SyncIndex fcIndex = pSyncMsg->commitIndex;
2346 2347 2348 2349 2350 2351 2352

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2353
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2354 2355
        }
      }
2356 2357 2358
    }
  }

2359
  if (pMsg->term >= currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2360 2361 2362 2363
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2364
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
2365 2366
    pSyncMsg->currentTerm = pMsg->term;
    pSyncMsg->commitIndex = pMsg->commitIndex;
2367

S
Shengliang Guan 已提交
2368 2369
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2370 2371 2372 2373
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
S
Shengliang Guan 已提交
2374
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pSyncMsg->currentTerm);
2375
      }
2376 2377 2378 2379 2380
    }
  }

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
2381 2382

  if (resetElect) syncNodeResetElectTimer(ths);
2383 2384 2385
  return 0;
}

2386
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2387 2388 2389 2390
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2391
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2392
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2393 2394 2395 2396
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2397 2398

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2399
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2400

2401 2402
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2403
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
2404 2405
}

2406
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2407
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2408

M
Minghao Li 已提交
2409 2410 2411 2412
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2413
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2414
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2415
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2416

2417
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2418
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2419 2420 2421
  return 0;
}

S
Shengliang Guan 已提交
2422 2423
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2424 2425
  syncLogRecvLocalCmd(ths, pMsg, "");

2426
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
2427
    syncNodeStepDown(ths, pMsg->currentTerm);
2428 2429

  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
2430 2431 2432 2433
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
      return 0;
    }
2434 2435 2436 2437
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
    if (pMsg->currentTerm == matchTerm) {
      (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
    }
2438
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
S
Shengliang Guan 已提交
2439
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
2440 2441 2442 2443 2444 2445 2446 2447 2448
             ths->commitIndex);
    }
  } else {
    sError("error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
2449 2450 2451 2452 2453 2454 2455 2456 2457 2458
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2459

2460
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2461
  sNTrace(ths, "on client request");
2462

B
Benguang Zhao 已提交
2463 2464
  int32_t code = 0;

B
Benguang Zhao 已提交
2465
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
2466
  SyncTerm        term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2467
  SSyncRaftEntry* pEntry = NULL;
2468 2469 2470 2471
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2472 2473
  }

2474 2475 2476 2477 2478
  if (pEntry == NULL) {
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
    return -1;
  }

B
Benguang Zhao 已提交
2479 2480 2481 2482 2483
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2484 2485
    int32_t code = syncNodeAppend(ths, pEntry);
    return code;
2486 2487 2488
  } else {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
2489
    return -1;
B
Benguang Zhao 已提交
2490 2491 2492
  }
}

S
Shengliang Guan 已提交
2493 2494 2495
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2496
      return "follower";
S
Shengliang Guan 已提交
2497
    case TAOS_SYNC_STATE_CANDIDATE:
2498
      return "candidate";
S
Shengliang Guan 已提交
2499
    case TAOS_SYNC_STATE_LEADER:
2500
      return "leader";
S
Shengliang Guan 已提交
2501
    case TAOS_SYNC_STATE_ERROR:
2502
      return "error";
S
Shengliang Guan 已提交
2503 2504 2505 2506
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
    default:
      return "unknown";
S
Shengliang Guan 已提交
2507
  }
M
Minghao Li 已提交
2508
}
2509

2510
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2511
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2512 2513 2514 2515
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
        .vgId = ths->vgId,
    };
2516 2517 2518 2519 2520 2521 2522 2523 2524 2525

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2526 2527 2528 2529
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

2530
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2531
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2532 2533 2534 2535 2536
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2537 2538 2539 2540
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2541
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2542 2543 2544 2545 2546
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2547
}
M
Minghao Li 已提交
2548

2549 2550
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2551
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2552 2553 2554 2555 2556 2557 2558
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2559 2560
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2561
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2562 2563 2564 2565 2566 2567 2568 2569 2570
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2571
  if (pState == NULL) {
2572
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2573 2574
    return false;
  }
M
Minghao Li 已提交
2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2600
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2601
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2602
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2603 2604 2605 2606 2607 2608
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2609
}