syncMain.c 87.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
M
Minghao Li 已提交
26
#include "syncRaftCfg.h"
M
Minghao Li 已提交
27
#include "syncRaftLog.h"
M
Minghao Li 已提交
28
#include "syncRaftStore.h"
M
Minghao Li 已提交
29
#include "syncReplication.h"
M
Minghao Li 已提交
30 31
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
32
#include "syncRespMgr.h"
M
Minghao Li 已提交
33
#include "syncSnapshot.h"
M
Minghao Li 已提交
34
#include "syncTimeout.h"
M
Minghao Li 已提交
35
#include "syncUtil.h"
M
Minghao Li 已提交
36
#include "syncVoteMgr.h"
M
Minghao Li 已提交
37

M
Minghao Li 已提交
38 39 40 41 42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
43
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
44
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
45 46 47
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
M
Minghao Li 已提交
48

49
int64_t syncOpen(SSyncInfo* pSyncInfo) {
S
Shengliang Guan 已提交
50 51
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
52
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
53 54
    return -1;
  }
M
Minghao Li 已提交
55

S
Shengliang Guan 已提交
56 57 58
  pSyncNode->rid = syncNodeAdd(pSyncNode);
  if (pSyncNode->rid < 0) {
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
59 60 61
    return -1;
  }

S
Shengliang Guan 已提交
62 63 64 65 66 67
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
68
  return pSyncNode->rid;
M
Minghao Li 已提交
69
}
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71
void syncStart(int64_t rid) {
S
Shengliang Guan 已提交
72 73 74 75
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodeStart(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
76 77 78
  }
}

M
Minghao Li 已提交
79
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
80
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
81
  if (pSyncNode != NULL) {
S
Shengliang Guan 已提交
82
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
83
    syncNodeRemove(rid);
M
Minghao Li 已提交
84
  }
S
Shengliang Guan 已提交
85
}
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87 88
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
89 90 91 92
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
M
Minghao Li 已提交
93 94
}

S
Shengliang Guan 已提交
95 96 97
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
98 99
}

S
Shengliang Guan 已提交
100
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
101
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
102
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
103

M
Minghao Li 已提交
104
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
105
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
106
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
107
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
108
    return -1;
M
Minghao Li 已提交
109
  }
110

S
Shengliang Guan 已提交
111 112
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
113

M
Minghao Li 已提交
114 115 116 117
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
118
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
119 120 121 122 123
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
    syncNodeReplicate(pSyncNode);
  }
S
Shengliang Guan 已提交
124

S
Shengliang Guan 已提交
125
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
126
  return 0;
M
Minghao Li 已提交
127
}
M
Minghao Li 已提交
128

S
Shengliang Guan 已提交
129 130 131 132
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
133
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
134 135 136 137 138 139 140 141 142 143 144
  if (pSyncNode == NULL) return code;

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
    SyncHeartbeat* pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
    syncHeartbeatDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
    SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
    syncHeartbeatReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
S
Shengliang Guan 已提交
145
    code = syncNodeOnTimer(pSyncNode, pMsg);
S
Shengliang Guan 已提交
146
  } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
147
    code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
S
Shengliang Guan 已提交
148
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
S
Shengliang Guan 已提交
149
    syncNodeOnRequestVote(pSyncNode, pMsg);
S
Shengliang Guan 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
    SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
    syncRequestVoteReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
    SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
    syncAppendEntriesDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
    SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
    syncSnapshotSendDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
    SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
    syncSnapshotRspDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
    SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
    code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
    syncLocalCmdDestroy(pSyncMsg);
  } else {
    sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
    code = -1;
M
Minghao Li 已提交
177 178
  }

S
Shengliang Guan 已提交
179
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
180
  return code;
181 182
}

S
Shengliang Guan 已提交
183
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
184
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
185
  if (pSyncNode == NULL) return -1;
186

S
Shengliang Guan 已提交
187
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
188
  syncNodeRelease(pSyncNode);
189 190 191
  return ret;
}

M
Minghao Li 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

208
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
209
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
210
  if (pSyncNode == NULL) {
211
    sError("sync begin snapshot error");
212 213
    return -1;
  }
214

215 216
  int32_t code = 0;

M
Minghao Li 已提交
217
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
218 219 220
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
221 222 223
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
224 225 226
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
227 228
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
229
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
230 231 232
      return 0;
    }

M
Minghao Li 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
    goto _DEL_WAL;

  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
250 251 252 253
              sNTrace(pSyncNode,
                      "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                      " of %s:%d, do not delete wal",
                      lastApplyIndex, matchIndex, host, port);
M
Minghao Li 已提交
254 255
            } while (0);

S
Shengliang Guan 已提交
256
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
257 258 259 260 261 262
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
263 264 265
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
266
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
267 268 269 270
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
271
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
272
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
273 274 275
        return 0;

      } else {
S
Shengliang Guan 已提交
276
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
277
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
278 279 280 281 282 283 284 285 286
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
287 288 289
    }
  }

M
Minghao Li 已提交
290
_DEL_WAL:
291

M
Minghao Li 已提交
292
  do {
293 294 295 296
    SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

    if (snapshottingIndex == SYNC_INDEX_INVALID) {
      atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
297
      pSyncNode->snapshottingTime = taosGetTimestampMs();
298

M
Minghao Li 已提交
299 300 301
      SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
      code = walBeginSnapshot(pData->pWal, lastApplyIndex);
      if (code == 0) {
S
Shengliang Guan 已提交
302 303
        sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
304
      } else {
S
Shengliang Guan 已提交
305
        sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
S
Shengliang Guan 已提交
306
                terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
307 308
        atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
      }
309 310

    } else {
S
Shengliang Guan 已提交
311 312
      sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
              snapshottingIndex, lastApplyIndex);
313
    }
M
Minghao Li 已提交
314
  } while (0);
315

S
Shengliang Guan 已提交
316
  syncNodeRelease(pSyncNode);
317 318 319 320
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
321
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
322
  if (pSyncNode == NULL) {
323
    sError("sync end snapshot error");
324 325 326
    return -1;
  }

327 328 329 330
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
331
    if (code != 0) {
332
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
333
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
334 335
      return -1;
    } else {
S
Shengliang Guan 已提交
336
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
337 338
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
339
  }
340

S
Shengliang Guan 已提交
341
  syncNodeRelease(pSyncNode);
342 343 344
  return code;
}

M
Minghao Li 已提交
345
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
346
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
347
  if (pSyncNode == NULL) {
348
    sError("sync step down error");
M
Minghao Li 已提交
349 350 351 352
    return -1;
  }

  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
353
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
354 355 356
  return 0;
}

357 358 359
bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
360
    sError("sync ready for read error");
361 362 363 364 365 366 367 368 369 370
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) {
    syncNodeRelease(pSyncNode);
    return true;
  }

  bool ready = false;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
371 372 373
    if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
      // apply queue not empty
      ready = false;
374

375 376 377 378
    } else {
      if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
        SSyncRaftEntry* pEntry = NULL;
        int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(
379
            pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
380 381 382 383 384 385 386
        if (code == 0 && pEntry != NULL) {
          if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
            ready = true;
          }

          syncEntryDestory(pEntry);
        }
387 388 389 390
      }
    }
  }

391 392 393 394 395 396 397 398
  if (!ready) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
      terrno = TSDB_CODE_SYN_NOT_LEADER;
    } else {
      terrno = TSDB_CODE_APP_NOT_READY;
    }
  }

399 400 401 402
  syncNodeRelease(pSyncNode);
  return ready;
}

M
Minghao Li 已提交
403 404
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
405
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
406 407 408 409
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

410 411 412 413 414 415
  int32_t ret = 0;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
416 417 418 419 420
  return ret;
}

int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
421
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
422 423 424 425
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

S
Shengliang Guan 已提交
426
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
427

M
Minghao Li 已提交
428 429 430 431 432 433 434 435 436
  SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;
  ASSERT(pMsg != NULL);
  SRpcMsg rpcMsg = {0};
  syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
  syncLeaderTransferDestroy(pMsg);

437
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false);
M
Minghao Li 已提交
438 439 440
  return ret;
}

441 442
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
443

S
Shengliang Guan 已提交
444
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
445 446 447 448
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
449 450
  }

451
  return state;
M
Minghao Li 已提交
452 453
}

454
#if 0
455 456 457 458 459
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
460
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
461 462 463 464 465 466 467 468 469 470 471
  if (pSyncNode == NULL) {
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
    }
S
Shengliang Guan 已提交
472
    syncNodeRelease(pSyncNode);
473 474 475 476 477 478 479 480 481 482
    return -1;
  }
  ASSERT(pEntry != NULL);

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

  syncEntryDestory(pEntry);
S
Shengliang Guan 已提交
483
  syncNodeRelease(pSyncNode);
484 485 486
  return 0;
}

487
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
488
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
489 490 491
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
492
  ASSERT(rid == pSyncNode->rid);
493 494
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
495
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
496

S
Shengliang Guan 已提交
497
  syncNodeRelease(pSyncNode);
498 499 500
  return 0;
}

501
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
502
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
503 504 505
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
506
  ASSERT(rid == pSyncNode->rid);
507 508 509 510

  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
511
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
512 513 514 515 516 517
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
518
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
519
         sMeta->lastConfigIndex);
520

S
Shengliang Guan 已提交
521
  syncNodeRelease(pSyncNode);
522 523
  return 0;
}
524
#endif
525

526 527 528 529
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
530
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
531 532 533 534 535
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
536
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
537
         snapshotLastApplyIndex, lastIndex);
538 539 540 541

  return lastIndex;
}

S
Shengliang Guan 已提交
542
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
M
Minghao Li 已提交
543
  pEpSet->numOfEps = 0;
544

S
Shengliang Guan 已提交
545
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
546
  if (pSyncNode == NULL) return;
547

S
Shengliang Guan 已提交
548
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
549 550 551 552 553
    SEp* pEp = &pEpSet->eps[i];
    tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    pEpSet->numOfEps++;
    sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
554
  }
M
Minghao Li 已提交
555 556 557
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
  }
M
Minghao Li 已提交
558

S
Shengliang Guan 已提交
559
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
560
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
561
}
562

M
Minghao Li 已提交
563
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
564
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
565
  if (pSyncNode == NULL) {
566
    sError("sync propose error");
M
Minghao Li 已提交
567
    return -1;
568
  }
M
Minghao Li 已提交
569

570
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
S
Shengliang Guan 已提交
571
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
572 573 574
  return ret;
}

575
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
576 577 578 579 580
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
    return -1;
  }
M
Minghao Li 已提交
581

S
Shengliang Guan 已提交
582 583 584 585 586 587 588 589 590 591 592
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
593
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
594 595 596
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
597 598 599
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
S
Shengliang Guan 已提交
600 601
    } else {
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
602
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
603
             TMSG_INFO(pMsg->msgType));
604
      return -1;
605
    }
S
Shengliang Guan 已提交
606
  } else {
S
Shengliang Guan 已提交
607 608
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
609 610 611 612 613 614 615
    SRpcMsg   rpcMsg = {0};
    int32_t   code = syncClientRequestBuildFromRpcMsg(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
    }
M
Minghao Li 已提交
616

617 618 619 620 621
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
622
    }
M
Minghao Li 已提交
623

624 625
    return code;
  }
M
Minghao Li 已提交
626 627
}

S
Shengliang Guan 已提交
628
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
629 630 631 632 633 634 635 636 637
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
638
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
639
  int32_t ret = 0;
S
Shengliang Guan 已提交
640
  if (syncIsInit()) {
M
Minghao Li 已提交
641
    SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
642 643 644 645
    pData->pSyncNode = pSyncNode;
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
M
Minghao Li 已提交
646

647
    pSyncTimer->pData = pData;
S
Shengliang Guan 已提交
648
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
649 650 651 652 653 654
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
655
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
656 657 658 659
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
M
Minghao Li 已提交
660
  // taosMemoryFree(pSyncTimer->pData);
661 662 663
  return ret;
}

S
Shengliang Guan 已提交
664 665
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
666 667 668 669
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
670

M
Minghao Li 已提交
671 672 673 674
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
675
      goto _error;
M
Minghao Li 已提交
676
    }
677
  }
M
Minghao Li 已提交
678

S
Shengliang Guan 已提交
679
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
680
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
681
    // create a new raft config file
S
Shengliang Guan 已提交
682
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
683
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
684
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
685
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
686
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
687 688
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
689
      goto _error;
690
    }
691
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
692
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
693 694
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
695 696 697
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
698
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
699
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
700
      goto _error;
701
    }
S
Shengliang Guan 已提交
702 703

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
704 705 706 707 708 709
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
710 711 712 713
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
714 715

    raftCfgClose(pSyncNode->pRaftCfg);
716
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
717 718
  }

S
Shengliang Guan 已提交
719 720
  // init by SSyncInfo
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
721 722 723 724 725 726 727
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
728
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
729 730 731
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
732

M
Minghao Li 已提交
733
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
734
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
735 736 737
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
738

M
Minghao Li 已提交
739 740
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
741
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
742
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
743 744
    goto _error;
  }
M
Minghao Li 已提交
745

M
Minghao Li 已提交
746
  // init internal
M
Minghao Li 已提交
747
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
748
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
749
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
750
    goto _error;
751
  }
M
Minghao Li 已提交
752

M
Minghao Li 已提交
753
  // init peersNum, peers, peersId
M
Minghao Li 已提交
754
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
755 756
  int32_t j = 0;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
757 758
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
759 760 761
      j++;
    }
  }
S
Shengliang Guan 已提交
762
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
763
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
764
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
765
      goto _error;
766
    }
M
Minghao Li 已提交
767
  }
M
Minghao Li 已提交
768

M
Minghao Li 已提交
769
  // init replicaNum, replicasId
M
Minghao Li 已提交
770
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
771
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
772
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
773
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
774
      goto _error;
775
    }
M
Minghao Li 已提交
776 777
  }

M
Minghao Li 已提交
778
  // init raft algorithm
M
Minghao Li 已提交
779
  pSyncNode->pFsm = pSyncInfo->pFsm;
780
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
781
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
782 783
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
784
  // init life cycle outside
M
Minghao Li 已提交
785

M
Minghao Li 已提交
786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
810
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
811
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
812
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
813
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
814
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
815 816
    goto _error;
  }
M
Minghao Li 已提交
817

M
Minghao Li 已提交
818
  // init TLA+ candidate vars
M
Minghao Li 已提交
819
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
820
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
821
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
822 823
    goto _error;
  }
M
Minghao Li 已提交
824
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
825
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
826
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
827 828
    goto _error;
  }
M
Minghao Li 已提交
829

M
Minghao Li 已提交
830 831
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
832
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
833
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
834 835
    goto _error;
  }
M
Minghao Li 已提交
836
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
837
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
838
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
839 840
    goto _error;
  }
M
Minghao Li 已提交
841 842 843

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
844
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
845
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
846 847
    goto _error;
  }
848 849 850 851 852

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
853
    if (code != 0) {
S
Shengliang Guan 已提交
854
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
855
      goto _error;
856
    }
857 858
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
859
      sNTrace(pSyncNode, "reset commit index by snapshot");
860 861 862
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
863

M
Minghao Li 已提交
864 865 866 867 868
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
869
  // init ping timer
M
Minghao Li 已提交
870
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
871
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
872 873
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
874
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
875
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
876

M
Minghao Li 已提交
877 878
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
879
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
880
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
881
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
882 883 884 885
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
886
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
887 888
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
889
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
890 891
  pSyncNode->heartbeatTimerCounter = 0;

892 893 894 895 896
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
897
  // tools
M
Minghao Li 已提交
898
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
899
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
900
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
901 902
    goto _error;
  }
M
Minghao Li 已提交
903

904 905
  // restore state
  pSyncNode->restoreFinish = false;
906

M
Minghao Li 已提交
907
  // snapshot senders
S
Shengliang Guan 已提交
908
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
909 910 911 912 913 914
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
  }

  // snapshot receivers
915
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
916

M
Minghao Li 已提交
917 918 919
  // is config changing
  pSyncNode->changing = false;

M
Minghao Li 已提交
920 921 922
  // peer state
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
923 924 925
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
926
  // start in syncNodeStart
M
Minghao Li 已提交
927
  // start raft
M
Minghao Li 已提交
928
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
929

M
Minghao Li 已提交
930 931
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
932
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
933 934
  pSyncNode->lastReplicateTime = timeNow;

935 936 937
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

S
Shengliang Guan 已提交
938
  sNTrace(pSyncNode, "sync open");
939

M
Minghao Li 已提交
940
  return pSyncNode;
941 942 943

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
944 945
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
946 947 948 949
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
950 951
}

M
Minghao Li 已提交
952 953 954 955 956 957 958 959 960 961 962
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
    ASSERT(code == 0);
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

M
Minghao Li 已提交
963 964
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
965
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
966
    raftStoreNextTerm(pSyncNode->pRaftStore);
967
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
968

969
    // Raft 3.6.2 Committing entries from previous terms
970 971
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
972

M
Minghao Li 已提交
973 974
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
975 976
  }

977 978 979
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
980 981
}

M
Minghao Li 已提交
982 983 984 985 986 987 988 989 990
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
991

992 993 994
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
995 996
}

M
Minghao Li 已提交
997 998 999 1000 1001 1002 1003 1004
void syncNodePreClose(SSyncNode* pSyncNode) {
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

M
Minghao Li 已提交
1005
void syncNodeClose(SSyncNode* pSyncNode) {
1006 1007 1008
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
1009 1010
  int32_t ret;

S
Shengliang Guan 已提交
1011
  sNTrace(pSyncNode, "sync close");
M
Minghao Li 已提交
1012

M
Minghao Li 已提交
1013
  ret = raftStoreClose(pSyncNode->pRaftStore);
M
Minghao Li 已提交
1014
  ASSERT(ret == 0);
M
Minghao Li 已提交
1015
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
1016

M
Minghao Li 已提交
1017
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1018
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1019
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1020
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1021
  votesRespondDestory(pSyncNode->pVotesRespond);
1022
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1023
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1024
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1025
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1026
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1027
  logStoreDestory(pSyncNode->pLogStore);
1028
  pSyncNode->pLogStore = NULL;
M
Minghao Li 已提交
1029
  raftCfgClose(pSyncNode->pRaftCfg);
1030
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1031 1032 1033 1034 1035

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1036 1037 1038 1039
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

S
Shengliang Guan 已提交
1040
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1041 1042 1043 1044 1045 1046
    if ((pSyncNode->senders)[i] != NULL) {
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1047 1048 1049 1050 1051
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1052
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1053 1054
}

M
Minghao Li 已提交
1055
// option
M
Minghao Li 已提交
1056 1057
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }

M
Minghao Li 已提交
1058
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1059

M
Minghao Li 已提交
1060 1061 1062
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1063 1064
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1065 1066 1067
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1068
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1069
  }
M
Minghao Li 已提交
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1083
  if (syncIsInit()) {
1084
    pSyncNode->electTimerMS = ms;
M
Minghao Li 已提交
1085 1086 1087 1088 1089 1090

    SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
    pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
    pElectTimer->pSyncNode = pSyncNode;
    pElectTimer->pData = NULL;

S
Shengliang Guan 已提交
1091
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager,
1092
                 &pSyncNode->pElectTimer);
1093

1094
  } else {
M
Minghao Li 已提交
1095
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1096
  }
M
Minghao Li 已提交
1097 1098 1099 1100 1101
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1102
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1103 1104
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1105

M
Minghao Li 已提交
1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1116 1117
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1118 1119 1120 1121 1122 1123 1124
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1125
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1126

S
Shengliang Guan 已提交
1127 1128
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1129 1130 1131
  return ret;
}

M
Minghao Li 已提交
1132
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1133
  int32_t ret = 0;
S
Shengliang Guan 已提交
1134 1135
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1136 1137 1138
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1139
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1140
  }
1141

S
Shengliang Guan 已提交
1142
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1143 1144 1145
  return ret;
}

M
Minghao Li 已提交
1146
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1147
  int32_t ret = 0;
M
Minghao Li 已提交
1148

1149
#if 0
M
Minghao Li 已提交
1150
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1151 1152
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1153

S
Shengliang Guan 已提交
1154
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1155
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1156 1157 1158
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1159
  }
1160

M
Minghao Li 已提交
1161 1162 1163
  return ret;
}

M
Minghao Li 已提交
1164 1165
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1166 1167

#if 0
M
Minghao Li 已提交
1168 1169 1170
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1171
#endif
1172

S
Shengliang Guan 已提交
1173
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1174
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1175 1176 1177
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1178
  }
1179

M
Minghao Li 已提交
1180 1181 1182
  return ret;
}

1183 1184 1185 1186 1187 1188
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1189 1190 1191
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1192
  syncUtilRaftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1193
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1194 1195 1196
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1197
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1198
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1199
  } else {
M
Minghao Li 已提交
1200 1201
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
    return -1;
M
Minghao Li 已提交
1202
  }
M
Minghao Li 已提交
1203

M
Minghao Li 已提交
1204 1205 1206 1207 1208
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1209
  syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1210
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1211 1212 1213
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1214
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1215
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1216
  } else {
M
Minghao Li 已提交
1217
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1218
  }
M
Minghao Li 已提交
1219 1220 1221
  return 0;
}

1222
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1223 1224 1225
  bool b1 = false;
  bool b2 = false;

S
Shengliang Guan 已提交
1226
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1227 1228 1229 1230 1231 1232 1233
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

S
Shengliang Guan 已提交
1234
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

  ASSERT(b1 == b2);
  return b1;
}

1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1262
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1263
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1264 1265 1266 1267
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1268

1269
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1270 1271
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

M
Minghao Li 已提交
1272 1273
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1274

M
Minghao Li 已提交
1275 1276
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1277

M
Minghao Li 已提交
1278 1279 1280 1281
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1282
  }
1283

M
Minghao Li 已提交
1284 1285 1286 1287 1288
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1289

M
Minghao Li 已提交
1290
  // log begin config change
S
Shengliang Guan 已提交
1291 1292 1293 1294 1295
  char oldCfgStr[1024] = {0};
  char newCfgStr[1024] = {0};
  syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
  syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
  sNTrace(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
M
Minghao Li 已提交
1296

M
Minghao Li 已提交
1297 1298
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1299
  }
M
Minghao Li 已提交
1300 1301
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1302 1303
  }

M
Minghao Li 已提交
1304
  // add last config index
M
Minghao Li 已提交
1305
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1306

M
Minghao Li 已提交
1307 1308 1309 1310 1311 1312 1313 1314 1315
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1316
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1317
      oldSenders[i] = (pSyncNode->senders)[i];
S
Shengliang Guan 已提交
1318
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1319
    }
1320

M
Minghao Li 已提交
1321 1322
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1323
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1324 1325 1326

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1327 1328
    int32_t j = 0;
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
1329 1330 1331 1332 1333
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
S
Shengliang Guan 已提交
1334
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1335
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1336
    }
1337

M
Minghao Li 已提交
1338 1339
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
1340
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
1341
      syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1342
    }
1343

1344 1345 1346
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1347 1348 1349 1350
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1351

M
Minghao Li 已提交
1352
    // reset snapshot senders
1353

M
Minghao Li 已提交
1354
    // clear new
S
Shengliang Guan 已提交
1355
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1356 1357
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
1358

M
Minghao Li 已提交
1359
    // reset new
S
Shengliang Guan 已提交
1360
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1361 1362
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1363
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1364 1365 1366 1367
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1368
          sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
S
Shengliang Guan 已提交
1369
                  (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
M
Minghao Li 已提交
1370 1371 1372 1373 1374 1375 1376 1377 1378

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

S
Shengliang Guan 已提交
1379 1380
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
                  i, host, port, (pSyncNode->senders)[i], reset);
M
Minghao Li 已提交
1381
        }
1382 1383
      }
    }
1384

M
Minghao Li 已提交
1385
    // create new
S
Shengliang Guan 已提交
1386
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1387 1388
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
S
Shengliang Guan 已提交
1389
        sSTrace((pSyncNode->senders)[i], "snapshot sender create new");
M
Minghao Li 已提交
1390
      }
1391 1392
    }

M
Minghao Li 已提交
1393
    // free old
S
Shengliang Guan 已提交
1394
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1395 1396
      if (oldSenders[i] != NULL) {
        snapshotSenderDestroy(oldSenders[i]);
S
Shengliang Guan 已提交
1397
        sNTrace(pSyncNode, "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1398 1399
        oldSenders[i] = NULL;
      }
1400 1401
    }

1402
    // persist cfg
M
Minghao Li 已提交
1403
    raftCfgPersist(pSyncNode->pRaftCfg);
1404

S
Shengliang Guan 已提交
1405
    char tmpbuf[1024] = {0};
1406
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
S
Shengliang Guan 已提交
1407
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1408

M
Minghao Li 已提交
1409 1410 1411
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
1412 1413 1414 1415 1416

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
      syncMaybeAdvanceCommitIndex(pSyncNode);

M
Minghao Li 已提交
1417 1418 1419 1420
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
1421
    // persist cfg
M
Minghao Li 已提交
1422
    raftCfgPersist(pSyncNode->pRaftCfg);
S
Shengliang Guan 已提交
1423 1424
    sNTrace(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s", oldConfig.replicaNum,
            pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
1425
  }
1426

M
Minghao Li 已提交
1427
_END:
M
Minghao Li 已提交
1428
  // log end config change
S
Shengliang Guan 已提交
1429
  sNTrace(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1430 1431
}

M
Minghao Li 已提交
1432 1433 1434 1435
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1436
    char tmpBuf[64];
1437
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1438
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1439 1440 1441 1442
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1443 1444 1445 1446 1447 1448
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1449
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1450
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1451
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1452
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1453 1454
    return;
  }
M
Minghao Li 已提交
1455 1456

  do {
1457
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1458
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1459 1460 1461 1462 1463
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1464
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1475 1476
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1477
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1478
  // maybe clear leader cache
M
Minghao Li 已提交
1479 1480 1481 1482
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
1483
  // state change
M
Minghao Li 已提交
1484 1485 1486
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1487 1488
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1489

1490 1491 1492
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1493 1494 1495 1496 1497
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1498 1499 1500
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1501
  // trace log
S
Shengliang Guan 已提交
1502
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1523
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1524 1525
  pSyncNode->leaderTime = taosGetTimestampMs();

1526 1527 1528
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1529
  // state change
M
Minghao Li 已提交
1530
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1531 1532

  // set leader cache
M
Minghao Li 已提交
1533 1534
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1535
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1536 1537
    // maybe overwrite myself, no harm
    // just do it!
1538 1539 1540 1541 1542 1543 1544 1545 1546

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1547 1548
  }

S
Shengliang Guan 已提交
1549
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1550 1551
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1552 1553 1554
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1555 1556 1557
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1558
#if 0
1559 1560
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1561
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1562
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1563 1564 1565
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1566
    }
1567
    (pMySender->privateTerm) += 100;
1568
  }
M
Minghao Li 已提交
1569
#endif
1570

1571 1572 1573 1574 1575
  // close receiver
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1576
  // stop elect timer
M
Minghao Li 已提交
1577
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1578

M
Minghao Li 已提交
1579 1580
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1581

M
Minghao Li 已提交
1582 1583
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1584

1585 1586 1587 1588 1589
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1590 1591 1592
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1593
  // trace log
S
Shengliang Guan 已提交
1594
  sNTrace(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1595 1596 1597
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1598 1599
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1600
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1601

S
Shengliang Guan 已提交
1602
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1603

M
Minghao Li 已提交
1604
  // Raft 3.6.2 Committing entries from previous terms
1605 1606
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1607 1608

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1609
    syncNodeReplicate(pSyncNode);
1610
  }
M
Minghao Li 已提交
1611 1612
}

M
Minghao Li 已提交
1613 1614
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1615
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1616
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1617 1618 1619 1620 1621
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1622 1623 1624
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1625
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1626
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
S
Shengliang Guan 已提交
1627
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1628 1629 1630
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1631
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1632
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
S
Shengliang Guan 已提交
1633
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1634 1635 1636
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1637
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1638
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
S
Shengliang Guan 已提交
1639
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1640 1641 1642
}

// raft vote --------------
M
Minghao Li 已提交
1643 1644 1645

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1646
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
M
Minghao Li 已提交
1647 1648
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1649 1650 1651 1652

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1653
// simulate get vote from outside
M
Minghao Li 已提交
1654 1655 1656
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

M
Minghao Li 已提交
1657
  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId);
M
Minghao Li 已提交
1658 1659 1660 1661 1662 1663 1664 1665 1666 1667
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}

M
Minghao Li 已提交
1668
// snapshot --------------
M
Minghao Li 已提交
1669 1670

// return if has a snapshot
M
Minghao Li 已提交
1671 1672
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1673
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1674 1675
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1676 1677 1678 1679 1680 1681 1682
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1683 1684
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1685
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1686
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1687 1688
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1689 1690 1691 1692 1693 1694 1695
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1696 1697
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1698 1699
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1700 1701
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1702
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1703 1704
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1705 1706
    }

M
Minghao Li 已提交
1707 1708 1709
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1710 1711 1712 1713
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1714
  } else {
M
Minghao Li 已提交
1715 1716
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1717
  }
M
Minghao Li 已提交
1718

M
Minghao Li 已提交
1719 1720 1721 1722 1723 1724 1725
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1726 1727
  return 0;
}
M
Minghao Li 已提交
1728

M
Minghao Li 已提交
1729
// return append-entries first try index
M
Minghao Li 已提交
1730 1731 1732 1733 1734
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1735 1736
// if index > 0, return index - 1
// else, return -1
1737 1738 1739 1740 1741 1742 1743 1744 1745
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1746 1747 1748 1749
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

  SyncTerm        preTerm = 0;
  SyncIndex       preIndex = index - 1;
  SSyncRaftEntry* pPreEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
M
Minghao Li 已提交
1763 1764 1765 1766 1767 1768

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

1769 1770 1771 1772 1773 1774
  if (code == 0) {
    ASSERT(pPreEntry != NULL);
    preTerm = pPreEntry->term;
    taosMemoryFree(pPreEntry);
    return preTerm;
  } else {
1775 1776 1777 1778
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
1779 1780 1781 1782
      }
    }
  }

1783
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
1784
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1785 1786
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
1787 1788 1789 1790

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1791
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1792 1793 1794
  return 0;
}

M
Minghao Li 已提交
1795
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806
  if (!syncIsInit()) return;

  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
    int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
      sNError(pNode, "failed to build ping msg");
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
1807
    }
M
Minghao Li 已提交
1808

S
Shengliang Guan 已提交
1809 1810 1811 1812 1813 1814
    sNTrace(pNode, "enqueue ping msg");
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
      sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr());
      rpcFreeCont(rpcMsg.pCont);
      return;
1815 1816
    }

S
Shengliang Guan 已提交
1817
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
M
Minghao Li 已提交
1818
  } else {
1819
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64,
S
Shengliang Guan 已提交
1820
           pNode->pingTimerLogicClock, pNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1821 1822 1823 1824
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847
  if (!syncIsInit()) return;

  SElectTimer* pElectTimer = param;
  SSyncNode*   pNode = pElectTimer->pSyncNode;

  SRpcMsg rpcMsg = {0};
  int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);

  if (code != 0) {
    sNError(pNode, "failed to build elect msg");
    taosMemoryFree(pElectTimer);
    return;
  }

  SyncTimeout* pTimeout = rpcMsg.pCont;
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
    sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr());
    rpcFreeCont(rpcMsg.pCont);
    taosMemoryFree(pElectTimer);
    return;
M
Minghao Li 已提交
1848
  }
M
Minghao Li 已提交
1849 1850

  taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
1851

M
Minghao Li 已提交
1852
#if 0
M
Minghao Li 已提交
1853
  // reset timer ms
S
Shengliang Guan 已提交
1854 1855 1856
  if (syncIsInit() && pNode->electBaseLine > 0) {
    pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine);
    taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer);
M
Minghao Li 已提交
1857 1858
  } else {
    sError("sync env is stop, syncNodeEqElectTimer");
M
Minghao Li 已提交
1859
  }
M
Minghao Li 已提交
1860
#endif
M
Minghao Li 已提交
1861 1862
}

M
Minghao Li 已提交
1863
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1864
  if (!syncIsInit()) return;
1865

S
Shengliang Guan 已提交
1866 1867 1868 1869 1870 1871 1872 1873 1874 1875
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
      int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
        sNError(pNode, "failed to build heartbeat msg");
        return;
1876
      }
M
Minghao Li 已提交
1877

S
Shengliang Guan 已提交
1878 1879 1880 1881 1882 1883
      sNTrace(pNode, "enqueue heartbeat timer");
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
        sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr());
        rpcFreeCont(rpcMsg.pCont);
        return;
1884
      }
S
Shengliang Guan 已提交
1885 1886 1887 1888

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

1889
    } else {
S
Shengliang Guan 已提交
1890 1891
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
1892
    }
M
Minghao Li 已提交
1893 1894 1895
  }
}

1896 1897 1898 1899 1900
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
  SSyncHbTimerData* pData = (SSyncHbTimerData*)param;
  SSyncNode*        pSyncNode = pData->pSyncNode;
  SSyncTimer*       pSyncTimer = pData->pTimer;

M
Minghao Li 已提交
1901 1902 1903 1904
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1905 1906 1907 1908
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return;
  }

M
Minghao Li 已提交
1909 1910 1911 1912
  if (pSyncNode->pRaftStore == NULL) {
    return;
  }

S
Shengliang Guan 已提交
1913
  // sNTrace(pSyncNode, "eq peer hb timer");
1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924

  int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
  int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

  if (pSyncNode->replicaNum > 1) {
    if (timerLogicClock == msgLogicClock) {
      SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
      pSyncMsg->srcId = pSyncNode->myRaftId;
      pSyncMsg->destId = pData->destId;
      pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
      pSyncMsg->commitIndex = pSyncNode->commitIndex;
M
Minghao Li 已提交
1925
      pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
1926 1927 1928 1929 1930 1931 1932
      pSyncMsg->privateTerm = 0;

      SRpcMsg rpcMsg;
      syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);

// eq msg
#if 0
S
Shengliang Guan 已提交
1933 1934
      if (pSyncNode->syncEqCtrlMsg != NULL) {
        int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
1935 1936 1937 1938 1939 1940 1941
        if (code != 0) {
          sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
          rpcFreeCont(rpcMsg.pCont);
          syncHeartbeatDestroy(pSyncMsg);
          return;
        }
      } else {
S
Shengliang Guan 已提交
1942
        sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
1943 1944 1945 1946
      }
#endif

      // send msg
M
Minghao Li 已提交
1947
      syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
1948 1949 1950

      syncHeartbeatDestroy(pSyncMsg);

S
Shengliang Guan 已提交
1951 1952
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
1953 1954 1955 1956 1957 1958
                     &pSyncTimer->pTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }

    } else {
1959
      sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", timerLogicClock,
1960 1961 1962 1963 1964
             msgLogicClock);
    }
  }
}

1965 1966 1967 1968 1969
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
1970

1971 1972 1973 1974
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
1975

S
Shengliang Guan 已提交
1976
  SRpcMsg rpcMsg = {0};
1977
  int32_t code = syncClientRequestBuildFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
M
Minghao Li 已提交
1978
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1979

1980 1981 1982 1983 1984 1985 1986
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
    sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr());
  }

  return code;
M
Minghao Li 已提交
1987 1988
}

1989 1990 1991
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
S
Shengliang Guan 已提交
1992 1993
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
1994 1995 1996 1997 1998 1999 2000 2001 2002
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

M
Minghao Li 已提交
2003 2004 2005
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

2006
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2007
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2008
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2009
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2010

2011 2012 2013
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2014
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2015
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2016
    if (code != 0) {
S
Shengliang Guan 已提交
2017
      sNError(ths, "append noop error");
2018 2019
      return -1;
    }
M
Minghao Li 已提交
2020 2021
  }

2022 2023 2024 2025 2026 2027
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2028 2029 2030
  return ret;
}

2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
  syncLogRecvHeartbeat(ths, pMsg, "");

  SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number

  SRpcMsg rpcMsg;
  syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);

M
Minghao Li 已提交
2043
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2044
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2045
    ths->minMatchIndex = pMsg->minMatchIndex;
2046 2047

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
      SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;

      SRpcMsg rpcMsgLocalCmd;
      syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2062
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, pSyncMsg->fcIndex);
2063 2064
        }
      }
2065 2066 2067
    }
  }

M
Minghao Li 已提交
2068
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2069 2070 2071 2072 2073 2074 2075 2076
    // syncNodeStepDown(ths, pMsg->term);
    SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

    SRpcMsg rpcMsgLocalCmd;
    syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

S
Shengliang Guan 已提交
2077 2078
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2079 2080 2081 2082
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2083
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2084 2085 2086 2087
      }
    }

    syncLocalCmdDestroy(pSyncMsg);
M
Minghao Li 已提交
2088 2089
  }

2090 2091 2092 2093 2094 2095 2096 2097 2098
  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
M
Minghao Li 已提交
2099
  syncHeartbeatReplyDestroy(pMsgReply);
2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112

  return 0;
}

int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
  syncLogRecvHeartbeatReply(ths, pMsg, "");

  // update last reply time, make decision whether the other node is alive or not
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime);

  return 0;
}

M
Minghao Li 已提交
2113
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
2114 2115
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2116 2117 2118
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2119 2120 2121
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2122
  } else {
S
Shengliang Guan 已提交
2123
    sNError(ths, "error local cmd");
M
Minghao Li 已提交
2124 2125 2126 2127 2128
  }

  return 0;
}

M
Minghao Li 已提交
2129 2130 2131 2132 2133 2134 2135 2136 2137 2138
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2139

2140
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2141
  sNTrace(ths, "on client request");
2142

M
Minghao Li 已提交
2143
  int32_t ret = 0;
2144
  int32_t code = 0;
M
Minghao Li 已提交
2145

M
Minghao Li 已提交
2146
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2147
  SyncTerm        term = ths->pRaftStore->currentTerm;
2148 2149 2150 2151 2152 2153 2154
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2155

2156 2157 2158
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2159
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2160 2161 2162
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2163 2164 2165 2166 2167 2168
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2169

2170 2171 2172 2173
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2185
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2186 2187 2188 2189 2190 2191 2192

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }

2193 2194
        return -1;
      }
2195
    }
M
Minghao Li 已提交
2196

2197 2198
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2199
      syncNodeReplicate(ths);
2200
    }
2201

2202 2203
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2204 2205 2206 2207 2208
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2209
    }
M
Minghao Li 已提交
2210 2211
  }

2212 2213 2214 2215 2216 2217 2218 2219
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2220 2221 2222 2223 2224 2225
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2226
  return ret;
2227
}
M
Minghao Li 已提交
2228

S
Shengliang Guan 已提交
2229 2230 2231
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2232
      return "follower";
S
Shengliang Guan 已提交
2233
    case TAOS_SYNC_STATE_CANDIDATE:
2234
      return "candidate";
S
Shengliang Guan 已提交
2235
    case TAOS_SYNC_STATE_LEADER:
2236
      return "leader";
S
Shengliang Guan 已提交
2237
    default:
2238
      return "error";
S
Shengliang Guan 已提交
2239
  }
M
Minghao Li 已提交
2240
}
2241

2242
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2243
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2244
    sNTrace(ths, "I am not follower, can not do leader transfer");
2245 2246
    return 0;
  }
2247 2248

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2249
    sNTrace(ths, "restore not finish, can not do leader transfer");
2250 2251 2252
    return 0;
  }

2253
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2254
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2255 2256 2257 2258
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2259
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2260 2261 2262
    return 0;
  }

2263 2264
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2265
      sNTrace(ths, "I am vnode, can not do leader transfer");
2266 2267 2268 2269
      return 0;
    }
  */

M
Minghao Li 已提交
2270
  SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
S
Shengliang Guan 已提交
2271
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2272

M
Minghao Li 已提交
2273 2274 2275
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2276

M
Minghao Li 已提交
2277 2278
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2279 2280 2281 2282
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
M
Minghao Li 已提交
2283

2284
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2285
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2286 2287
  }

M
Minghao Li 已提交
2288
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2289
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2290 2291 2292 2293 2294 2295 2296 2297 2298
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2299 2300
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2301 2302
  }

M
Minghao Li 已提交
2303
  syncLeaderTransferDestroy(pSyncLeaderTransfer);
2304 2305 2306
  return 0;
}

2307
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2308
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2322 2323 2324 2325
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2326
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2327 2328 2329 2330
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2331 2332 2333 2334 2335 2336 2337 2338 2339
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2340
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2341

M
Minghao Li 已提交
2342 2343 2344
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2345 2346
  }

2347 2348
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2349

S
Shengliang Guan 已提交
2350
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2351 2352 2353 2354 2355 2356

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2357 2358 2359 2360 2361 2362
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        } else {
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
M
Minghao Li 已提交
2363 2364 2365
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2366
            sNError(ths, "get log entry error");
2367
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2368 2369
            continue;
          }
2370
        }
2371

2372
        SRpcMsg rpcMsg = {0};
2373 2374
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2375
        // user commit
2376 2377
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2378
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2379 2380 2381
            internalExecute = false;
          }

S
Shengliang Guan 已提交
2382
          sNTrace(ths, "commit index:%" PRId64 ", internal:%d", i, internalExecute);
2383

2384 2385
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2398
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2399
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
2400
          }
2401 2402
        }

2403 2404
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2405
        // leader transfer
2406 2407 2408
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
2409
        }
2410
#endif
2411 2412

        // restore finish
2413
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
2414 2415 2416 2417 2418 2419
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
2420

2421
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
2422
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
2423 2424 2425 2426
          }
        }

        rpcFreeCont(rpcMsg.pCont);
2427 2428 2429 2430 2431
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2432 2433 2434 2435
      }
    }
  }
  return 0;
2436 2437 2438
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2439
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2440 2441 2442 2443 2444
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2445 2446 2447 2448
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2449
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2450 2451 2452 2453 2454
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2455
}
M
Minghao Li 已提交
2456

2457 2458
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2459
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2460 2461 2462 2463 2464 2465 2466
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2467 2468
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2469
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2470 2471 2472 2473 2474 2475 2476 2477 2478
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2479
  if (pState == NULL) {
2480
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2481 2482
    return false;
  }
M
Minghao Li 已提交
2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2508
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2509
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2510
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2511 2512 2513 2514 2515 2516
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2517 2518
}

2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
  if (timerType == SYNC_TIMEOUT_PING) {
    return "ping";
  } else if (timerType == SYNC_TIMEOUT_ELECTION) {
    return "elect";
  } else if (timerType == SYNC_TIMEOUT_HEARTBEAT) {
    return "heartbeat";
  } else {
    return "unknown";
  }
}

void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
2532
  sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
S
Shengliang Guan 已提交
2533
          syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
2534 2535
}

wafwerar's avatar
wafwerar 已提交
2536
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
M
Minghao Li 已提交
2537 2538 2539 2540
  char     logBuf[256];
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2541
  sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
S
Shengliang Guan 已提交
2542
          host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
M
Minghao Li 已提交
2543 2544
}

wafwerar's avatar
wafwerar 已提交
2545
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
M
Minghao Li 已提交
2546 2547 2548
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
2549
  sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
S
Shengliang Guan 已提交
2550
          pMsg->voteGranted, s);
M
Minghao Li 已提交
2551 2552
}

wafwerar's avatar
wafwerar 已提交
2553
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
M
Minghao Li 已提交
2554 2555 2556
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2557
  sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
S
Shengliang Guan 已提交
2558
          pMsg->voteGranted, s);
M
Minghao Li 已提交
2559 2560
}

wafwerar's avatar
wafwerar 已提交
2561
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
M
Minghao Li 已提交
2562 2563 2564
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2565
  sNTrace(pSyncNode,
2566 2567
          "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
          ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
S
Shengliang Guan 已提交
2568 2569
          host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
          pMsg->dataLen, s);
M
Minghao Li 已提交
2570 2571
}

wafwerar's avatar
wafwerar 已提交
2572
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
M
Minghao Li 已提交
2573 2574 2575
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2576 2577

  sNTrace(pSyncNode,
2578 2579
          "recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
          ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s",
S
Shengliang Guan 已提交
2580 2581
          host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
          pMsg->dataLen, s);
M
Minghao Li 已提交
2582 2583
}

2584
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
2585 2586 2587
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2588 2589

  sNTrace(pSyncNode,
2590
          "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
S
Shengliang Guan 已提交
2591 2592
          "}, %s",
          host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
2593 2594
}

wafwerar's avatar
wafwerar 已提交
2595
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
2596 2597 2598
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2599 2600

  sNTrace(pSyncNode,
2601
          "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
S
Shengliang Guan 已提交
2602 2603
          "}, %s",
          host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
2604
}
2605 2606 2607 2608 2609

void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2610 2611

  sNTrace(pSyncNode,
2612
          "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
S
Shengliang Guan 已提交
2613 2614
          "}, %s",
          host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
2615 2616 2617 2618 2619 2620
}

void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2621 2622

  sNTrace(pSyncNode,
2623
          "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
S
Shengliang Guan 已提交
2624 2625
          "}, %s",
          host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
2626 2627 2628 2629 2630 2631
}

void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2632

2633
  sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2634
          pMsg->term, pMsg->privateTerm, s);
2635 2636 2637 2638 2639 2640
}

void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2641
  sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2642
          pMsg->term, pMsg->privateTerm, s);
M
Minghao Li 已提交
2643
}
2644 2645

void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
2646 2647
  sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
          syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
M
Minghao Li 已提交
2648 2649 2650 2651 2652 2653
}

void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
2654
  sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
M
Minghao Li 已提交
2655 2656 2657 2658 2659 2660
}

void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2661
  sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
M
Minghao Li 已提交
2662 2663 2664 2665 2666 2667
}

void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
2668
  sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2669
          pMsg->term, pMsg->snapStart, s);
M
Minghao Li 已提交
2670 2671 2672 2673 2674 2675
}

void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2676
  sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host,
S
Shengliang Guan 已提交
2677
          port, pMsg->term, pMsg->snapStart, s);
M
Minghao Li 已提交
2678
}
M
Minghao Li 已提交
2679 2680 2681 2682 2683 2684 2685 2686

void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}

void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}

void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}

void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}