syncMain.c 85.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
M
Minghao Li 已提交
26
#include "syncRaftCfg.h"
M
Minghao Li 已提交
27
#include "syncRaftLog.h"
M
Minghao Li 已提交
28
#include "syncRaftStore.h"
M
Minghao Li 已提交
29
#include "syncReplication.h"
M
Minghao Li 已提交
30 31
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
32
#include "syncRespMgr.h"
M
Minghao Li 已提交
33
#include "syncSnapshot.h"
M
Minghao Li 已提交
34
#include "syncTimeout.h"
M
Minghao Li 已提交
35
#include "syncUtil.h"
M
Minghao Li 已提交
36
#include "syncVoteMgr.h"
M
Minghao Li 已提交
37

M
Minghao Li 已提交
38 39 40 41 42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
43
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
44
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
45 46 47
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
M
Minghao Li 已提交
48

49
int64_t syncOpen(SSyncInfo* pSyncInfo) {
S
Shengliang Guan 已提交
50 51
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
52
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
53 54
    return -1;
  }
M
Minghao Li 已提交
55

S
Shengliang Guan 已提交
56 57 58
  pSyncNode->rid = syncNodeAdd(pSyncNode);
  if (pSyncNode->rid < 0) {
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
59 60 61
    return -1;
  }

S
Shengliang Guan 已提交
62 63 64 65 66 67
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
68
  return pSyncNode->rid;
M
Minghao Li 已提交
69
}
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71
void syncStart(int64_t rid) {
S
Shengliang Guan 已提交
72 73 74 75
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodeStart(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
76 77 78
  }
}

M
Minghao Li 已提交
79
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
80
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
81
  if (pSyncNode != NULL) {
S
Shengliang Guan 已提交
82
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
83
    syncNodeRemove(rid);
M
Minghao Li 已提交
84
  }
S
Shengliang Guan 已提交
85
}
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87 88
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
89 90 91 92
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
M
Minghao Li 已提交
93 94
}

S
Shengliang Guan 已提交
95 96 97
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
98 99
}

S
Shengliang Guan 已提交
100
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
101
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
102
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
103

M
Minghao Li 已提交
104
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
105
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
106
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
107
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
108
    return -1;
M
Minghao Li 已提交
109
  }
110

S
Shengliang Guan 已提交
111 112
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
113

M
Minghao Li 已提交
114 115 116 117
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
118
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
119 120 121 122 123
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
    syncNodeReplicate(pSyncNode);
  }
S
Shengliang Guan 已提交
124

S
Shengliang Guan 已提交
125
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
126
  return 0;
M
Minghao Li 已提交
127
}
M
Minghao Li 已提交
128

S
Shengliang Guan 已提交
129 130 131 132
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
133
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
134 135 136 137 138 139 140 141 142 143 144
  if (pSyncNode == NULL) return code;

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
    SyncHeartbeat* pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
    syncHeartbeatDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
    SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
    syncHeartbeatReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
S
Shengliang Guan 已提交
145
    code = syncNodeOnTimer(pSyncNode, pMsg);
S
Shengliang Guan 已提交
146
  } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
147
    code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
S
Shengliang Guan 已提交
148
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
S
Shengliang Guan 已提交
149
    syncNodeOnRequestVote(pSyncNode, pMsg);
S
Shengliang Guan 已提交
150
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
S
Shengliang Guan 已提交
151
    code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
S
Shengliang Guan 已提交
152
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
153
    syncNodeOnAppendEntries(pSyncNode, pMsg);
S
Shengliang Guan 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
    SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
    syncSnapshotSendDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
    SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
    syncSnapshotRspDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
    SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
    code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
    syncLocalCmdDestroy(pSyncMsg);
  } else {
    sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
    code = -1;
M
Minghao Li 已提交
173 174
  }

S
Shengliang Guan 已提交
175
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
176
  return code;
177 178
}

S
Shengliang Guan 已提交
179
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
180
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
181
  if (pSyncNode == NULL) return -1;
182

S
Shengliang Guan 已提交
183
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
184
  syncNodeRelease(pSyncNode);
185 186 187
  return ret;
}

M
Minghao Li 已提交
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

204
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
205
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
206
  if (pSyncNode == NULL) {
207
    sError("sync begin snapshot error");
208 209
    return -1;
  }
210

211 212
  int32_t code = 0;

M
Minghao Li 已提交
213
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
214 215 216
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
217 218 219
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
220 221 222
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
223 224
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
225
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
226 227 228
      return 0;
    }

M
Minghao Li 已提交
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
    goto _DEL_WAL;

  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
246 247 248 249
              sNTrace(pSyncNode,
                      "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                      " of %s:%d, do not delete wal",
                      lastApplyIndex, matchIndex, host, port);
M
Minghao Li 已提交
250 251
            } while (0);

S
Shengliang Guan 已提交
252
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
253 254 255 256 257 258
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
259 260 261
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
262
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
263 264 265 266
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
267
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
268
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
269 270 271
        return 0;

      } else {
S
Shengliang Guan 已提交
272
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
273
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
274 275 276 277 278 279 280 281 282
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
283 284 285
    }
  }

M
Minghao Li 已提交
286
_DEL_WAL:
287

M
Minghao Li 已提交
288
  do {
289 290 291 292
    SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

    if (snapshottingIndex == SYNC_INDEX_INVALID) {
      atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
293
      pSyncNode->snapshottingTime = taosGetTimestampMs();
294

M
Minghao Li 已提交
295 296 297
      SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
      code = walBeginSnapshot(pData->pWal, lastApplyIndex);
      if (code == 0) {
S
Shengliang Guan 已提交
298 299
        sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
300
      } else {
S
Shengliang Guan 已提交
301
        sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
S
Shengliang Guan 已提交
302
                terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
303 304
        atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
      }
305 306

    } else {
S
Shengliang Guan 已提交
307 308
      sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
              snapshottingIndex, lastApplyIndex);
309
    }
M
Minghao Li 已提交
310
  } while (0);
311

S
Shengliang Guan 已提交
312
  syncNodeRelease(pSyncNode);
313 314 315 316
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
317
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
318
  if (pSyncNode == NULL) {
319
    sError("sync end snapshot error");
320 321 322
    return -1;
  }

323 324 325 326
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
327
    if (code != 0) {
328
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
329
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
330 331
      return -1;
    } else {
S
Shengliang Guan 已提交
332
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
333 334
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
335
  }
336

S
Shengliang Guan 已提交
337
  syncNodeRelease(pSyncNode);
338 339 340
  return code;
}

M
Minghao Li 已提交
341
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
342
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
343
  if (pSyncNode == NULL) {
344
    sError("sync step down error");
M
Minghao Li 已提交
345 346 347 348
    return -1;
  }

  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
349
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
350 351 352
  return 0;
}

353 354 355
bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
356
    sError("sync ready for read error");
357 358 359 360 361 362 363 364 365 366
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) {
    syncNodeRelease(pSyncNode);
    return true;
  }

  bool ready = false;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
367 368 369
    if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
      // apply queue not empty
      ready = false;
370

371 372 373 374
    } else {
      if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
        SSyncRaftEntry* pEntry = NULL;
        int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(
S
Shengliang Guan 已提交
375
                    pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
376 377 378 379 380 381 382
        if (code == 0 && pEntry != NULL) {
          if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
            ready = true;
          }

          syncEntryDestory(pEntry);
        }
383 384 385 386
      }
    }
  }

387 388 389 390 391 392 393 394
  if (!ready) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
      terrno = TSDB_CODE_SYN_NOT_LEADER;
    } else {
      terrno = TSDB_CODE_APP_NOT_READY;
    }
  }

395 396 397 398
  syncNodeRelease(pSyncNode);
  return ready;
}

M
Minghao Li 已提交
399 400
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
401
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
402 403 404 405
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

406 407 408 409 410 411
  int32_t ret = 0;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
412 413 414 415 416
  return ret;
}

int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
417
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
418 419 420 421
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

S
Shengliang Guan 已提交
422
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
423

M
Minghao Li 已提交
424 425 426 427 428 429 430 431 432
  SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;
  ASSERT(pMsg != NULL);
  SRpcMsg rpcMsg = {0};
  syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
  syncLeaderTransferDestroy(pMsg);

433
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false);
M
Minghao Li 已提交
434 435 436
  return ret;
}

437 438
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
439

S
Shengliang Guan 已提交
440
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
441 442 443 444
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
445 446
  }

447
  return state;
M
Minghao Li 已提交
448 449
}

450
#if 0
451 452 453 454 455
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
456
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
457 458 459 460 461 462 463 464 465 466 467
  if (pSyncNode == NULL) {
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
    }
S
Shengliang Guan 已提交
468
    syncNodeRelease(pSyncNode);
469 470 471 472 473 474 475 476 477 478
    return -1;
  }
  ASSERT(pEntry != NULL);

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

  syncEntryDestory(pEntry);
S
Shengliang Guan 已提交
479
  syncNodeRelease(pSyncNode);
480 481 482
  return 0;
}

483
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
484
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
485 486 487
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
488
  ASSERT(rid == pSyncNode->rid);
489 490
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
491
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
492

S
Shengliang Guan 已提交
493
  syncNodeRelease(pSyncNode);
494 495 496
  return 0;
}

497
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
498
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
499 500 501
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
502
  ASSERT(rid == pSyncNode->rid);
503 504 505 506

  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
507
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
508 509 510 511 512 513
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
514
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
515
         sMeta->lastConfigIndex);
516

S
Shengliang Guan 已提交
517
  syncNodeRelease(pSyncNode);
518 519
  return 0;
}
520
#endif
521

522 523 524 525
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
526
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
527 528 529 530 531
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
532
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
533
         snapshotLastApplyIndex, lastIndex);
534 535 536 537

  return lastIndex;
}

S
Shengliang Guan 已提交
538
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
M
Minghao Li 已提交
539
  pEpSet->numOfEps = 0;
540

S
Shengliang Guan 已提交
541
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
542
  if (pSyncNode == NULL) return;
543

S
Shengliang Guan 已提交
544
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
545 546 547 548 549
    SEp* pEp = &pEpSet->eps[i];
    tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    pEpSet->numOfEps++;
    sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
550
  }
M
Minghao Li 已提交
551 552 553
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
  }
M
Minghao Li 已提交
554

S
Shengliang Guan 已提交
555
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
556
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
557
}
558

M
Minghao Li 已提交
559
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
560
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
561
  if (pSyncNode == NULL) {
562
    sError("sync propose error");
M
Minghao Li 已提交
563
    return -1;
564
  }
M
Minghao Li 已提交
565

566
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
S
Shengliang Guan 已提交
567
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
568 569 570
  return ret;
}

571
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
572 573 574 575 576
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
    return -1;
  }
M
Minghao Li 已提交
577

S
Shengliang Guan 已提交
578 579 580 581 582 583 584 585 586 587 588
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
589
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
590 591 592
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
593 594 595
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
S
Shengliang Guan 已提交
596 597
    } else {
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
598
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
599
             TMSG_INFO(pMsg->msgType));
600
      return -1;
601
    }
S
Shengliang Guan 已提交
602
  } else {
S
Shengliang Guan 已提交
603 604
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
605 606 607 608 609 610 611
    SRpcMsg   rpcMsg = {0};
    int32_t   code = syncClientRequestBuildFromRpcMsg(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
    }
M
Minghao Li 已提交
612

613 614 615 616 617
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
618
    }
M
Minghao Li 已提交
619

620 621
    return code;
  }
M
Minghao Li 已提交
622 623
}

S
Shengliang Guan 已提交
624
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
625 626 627 628 629 630 631 632 633
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
634
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
635
  int32_t ret = 0;
S
Shengliang Guan 已提交
636
  if (syncIsInit()) {
M
Minghao Li 已提交
637
    SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
638 639 640 641
    pData->pSyncNode = pSyncNode;
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
M
Minghao Li 已提交
642

643
    pSyncTimer->pData = pData;
S
Shengliang Guan 已提交
644
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
645 646 647 648 649 650
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
651
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
652 653 654 655
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
M
Minghao Li 已提交
656
  // taosMemoryFree(pSyncTimer->pData);
657 658 659
  return ret;
}

S
Shengliang Guan 已提交
660 661
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
662 663 664 665
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
666

M
Minghao Li 已提交
667 668 669 670
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
671
      goto _error;
M
Minghao Li 已提交
672
    }
673
  }
M
Minghao Li 已提交
674

S
Shengliang Guan 已提交
675
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
676
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
677
    // create a new raft config file
S
Shengliang Guan 已提交
678
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
679
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
680
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
681
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
682
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
683 684
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
685
      goto _error;
686
    }
687
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
688
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
689 690
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
691 692 693
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
694
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
695
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
696
      goto _error;
697
    }
S
Shengliang Guan 已提交
698 699

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
700 701 702 703 704 705
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
706 707 708 709
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
710 711

    raftCfgClose(pSyncNode->pRaftCfg);
712
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
713 714
  }

S
Shengliang Guan 已提交
715 716
  // init by SSyncInfo
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
717 718 719 720 721 722 723
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
724
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
725 726 727
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
728

M
Minghao Li 已提交
729
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
730
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
731 732 733
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
734

M
Minghao Li 已提交
735 736
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
737
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
738
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
739 740
    goto _error;
  }
M
Minghao Li 已提交
741

M
Minghao Li 已提交
742
  // init internal
M
Minghao Li 已提交
743
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
744
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
745
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
746
    goto _error;
747
  }
M
Minghao Li 已提交
748

M
Minghao Li 已提交
749
  // init peersNum, peers, peersId
M
Minghao Li 已提交
750
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
751 752
  int32_t j = 0;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
753 754
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
755 756 757
      j++;
    }
  }
S
Shengliang Guan 已提交
758
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
759
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
760
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
761
      goto _error;
762
    }
M
Minghao Li 已提交
763
  }
M
Minghao Li 已提交
764

M
Minghao Li 已提交
765
  // init replicaNum, replicasId
M
Minghao Li 已提交
766
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
767
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
768
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
769
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
770
      goto _error;
771
    }
M
Minghao Li 已提交
772 773
  }

M
Minghao Li 已提交
774
  // init raft algorithm
M
Minghao Li 已提交
775
  pSyncNode->pFsm = pSyncInfo->pFsm;
776
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
777
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
778 779
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
780
  // init life cycle outside
M
Minghao Li 已提交
781

M
Minghao Li 已提交
782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
806
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
807
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
808
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
809
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
810
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
811 812
    goto _error;
  }
M
Minghao Li 已提交
813

M
Minghao Li 已提交
814
  // init TLA+ candidate vars
M
Minghao Li 已提交
815
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
816
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
817
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
818 819
    goto _error;
  }
M
Minghao Li 已提交
820
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
821
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
822
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
823 824
    goto _error;
  }
M
Minghao Li 已提交
825

M
Minghao Li 已提交
826 827
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
828
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
829
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
830 831
    goto _error;
  }
M
Minghao Li 已提交
832
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
833
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
834
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
835 836
    goto _error;
  }
M
Minghao Li 已提交
837 838 839

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
840
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
841
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
842 843
    goto _error;
  }
844 845 846 847 848

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
849
    if (code != 0) {
S
Shengliang Guan 已提交
850
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
851
      goto _error;
852
    }
853 854
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
855
      sNTrace(pSyncNode, "reset commit index by snapshot");
856 857 858
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
859

M
Minghao Li 已提交
860 861 862 863 864
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
865
  // init ping timer
M
Minghao Li 已提交
866
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
867
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
868 869
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
870
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
871
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
872

M
Minghao Li 已提交
873 874
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
875
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
876
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
877
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
878 879 880 881
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
882
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
883 884
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
885
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
886 887
  pSyncNode->heartbeatTimerCounter = 0;

888 889 890 891 892
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
893
  // tools
M
Minghao Li 已提交
894
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
895
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
896
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
897 898
    goto _error;
  }
M
Minghao Li 已提交
899

900 901
  // restore state
  pSyncNode->restoreFinish = false;
902

M
Minghao Li 已提交
903
  // snapshot senders
S
Shengliang Guan 已提交
904
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
905 906 907 908 909 910
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
  }

  // snapshot receivers
911
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
912

M
Minghao Li 已提交
913 914 915
  // is config changing
  pSyncNode->changing = false;

M
Minghao Li 已提交
916 917 918
  // peer state
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
919 920 921
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
922
  // start in syncNodeStart
M
Minghao Li 已提交
923
  // start raft
M
Minghao Li 已提交
924
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
925

M
Minghao Li 已提交
926 927
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
928
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
929 930
  pSyncNode->lastReplicateTime = timeNow;

931 932 933
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

S
Shengliang Guan 已提交
934
  sNTrace(pSyncNode, "sync open");
935

M
Minghao Li 已提交
936
  return pSyncNode;
937 938 939

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
940 941
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
942 943 944 945
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
946 947
}

M
Minghao Li 已提交
948 949 950 951 952 953 954 955 956 957 958
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
    ASSERT(code == 0);
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

M
Minghao Li 已提交
959 960
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
961
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
962
    raftStoreNextTerm(pSyncNode->pRaftStore);
963
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
964

965
    // Raft 3.6.2 Committing entries from previous terms
966 967
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
968

M
Minghao Li 已提交
969 970
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
971 972
  }

973 974 975
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
976 977
}

M
Minghao Li 已提交
978 979 980 981 982 983 984 985 986
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
987

988 989 990
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
991 992
}

M
Minghao Li 已提交
993 994 995 996 997 998 999 1000
void syncNodePreClose(SSyncNode* pSyncNode) {
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

M
Minghao Li 已提交
1001
void syncNodeClose(SSyncNode* pSyncNode) {
1002 1003 1004
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
1005 1006
  int32_t ret;

S
Shengliang Guan 已提交
1007
  sNTrace(pSyncNode, "sync close");
M
Minghao Li 已提交
1008

M
Minghao Li 已提交
1009
  ret = raftStoreClose(pSyncNode->pRaftStore);
M
Minghao Li 已提交
1010
  ASSERT(ret == 0);
M
Minghao Li 已提交
1011
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
1012

M
Minghao Li 已提交
1013
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1014
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1015
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1016
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1017
  votesRespondDestory(pSyncNode->pVotesRespond);
1018
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1019
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1020
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1021
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1022
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1023
  logStoreDestory(pSyncNode->pLogStore);
1024
  pSyncNode->pLogStore = NULL;
M
Minghao Li 已提交
1025
  raftCfgClose(pSyncNode->pRaftCfg);
1026
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1027 1028 1029 1030 1031

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1032 1033 1034 1035
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

S
Shengliang Guan 已提交
1036
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1037 1038 1039 1040 1041 1042
    if ((pSyncNode->senders)[i] != NULL) {
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1043 1044 1045 1046 1047
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1048
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1049 1050
}

M
Minghao Li 已提交
1051
// option
M
Minghao Li 已提交
1052 1053
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }

M
Minghao Li 已提交
1054
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1055

M
Minghao Li 已提交
1056 1057 1058
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1059 1060
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1061 1062 1063
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1064
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1065
  }
M
Minghao Li 已提交
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1079
  if (syncIsInit()) {
1080
    pSyncNode->electTimerMS = ms;
M
Minghao Li 已提交
1081 1082 1083 1084 1085 1086

    SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
    pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
    pElectTimer->pSyncNode = pSyncNode;
    pElectTimer->pData = NULL;

S
Shengliang Guan 已提交
1087
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager,
1088
                 &pSyncNode->pElectTimer);
1089

1090
  } else {
M
Minghao Li 已提交
1091
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1092
  }
M
Minghao Li 已提交
1093 1094 1095 1096 1097
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1098
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1099 1100
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1101

M
Minghao Li 已提交
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1112 1113
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1114 1115 1116 1117 1118 1119 1120
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1121
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1122

S
Shengliang Guan 已提交
1123 1124
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1125 1126 1127
  return ret;
}

M
Minghao Li 已提交
1128
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1129
  int32_t ret = 0;
S
Shengliang Guan 已提交
1130 1131
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1132 1133 1134
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1135
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1136
  }
1137

S
Shengliang Guan 已提交
1138
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1139 1140 1141
  return ret;
}

M
Minghao Li 已提交
1142
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1143
  int32_t ret = 0;
M
Minghao Li 已提交
1144

1145
#if 0
M
Minghao Li 已提交
1146
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1147 1148
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1149

S
Shengliang Guan 已提交
1150
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1151
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1152 1153 1154
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1155
  }
1156

M
Minghao Li 已提交
1157 1158 1159
  return ret;
}

M
Minghao Li 已提交
1160 1161
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1162 1163

#if 0
M
Minghao Li 已提交
1164 1165 1166
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1167
#endif
1168

S
Shengliang Guan 已提交
1169
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1170
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1171 1172 1173
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1174
  }
1175

M
Minghao Li 已提交
1176 1177 1178
  return ret;
}

1179 1180 1181 1182 1183 1184
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1185 1186 1187
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1188
  syncUtilRaftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1189
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1190 1191 1192
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1193
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1194
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1195
  } else {
M
Minghao Li 已提交
1196 1197
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
    return -1;
M
Minghao Li 已提交
1198
  }
M
Minghao Li 已提交
1199

M
Minghao Li 已提交
1200 1201 1202 1203 1204
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1205
  syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1206
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1207 1208 1209
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1210
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1211
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1212
  } else {
M
Minghao Li 已提交
1213
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1214
  }
M
Minghao Li 已提交
1215 1216 1217
  return 0;
}

1218
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1219 1220 1221
  bool b1 = false;
  bool b2 = false;

S
Shengliang Guan 已提交
1222
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1223 1224 1225 1226 1227 1228 1229
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

S
Shengliang Guan 已提交
1230
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

  ASSERT(b1 == b2);
  return b1;
}

1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1258
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1259
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1260 1261 1262 1263
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1264

1265
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1266 1267
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

M
Minghao Li 已提交
1268 1269
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1270

M
Minghao Li 已提交
1271 1272
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1273

M
Minghao Li 已提交
1274 1275 1276 1277
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1278
  }
1279

M
Minghao Li 已提交
1280 1281 1282 1283 1284
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1285

M
Minghao Li 已提交
1286
  // log begin config change
S
Shengliang Guan 已提交
1287 1288 1289 1290 1291
  char oldCfgStr[1024] = {0};
  char newCfgStr[1024] = {0};
  syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
  syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
  sNTrace(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
M
Minghao Li 已提交
1292

M
Minghao Li 已提交
1293 1294
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1295
  }
M
Minghao Li 已提交
1296 1297
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1298 1299
  }

M
Minghao Li 已提交
1300
  // add last config index
M
Minghao Li 已提交
1301
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1302

M
Minghao Li 已提交
1303 1304 1305 1306 1307 1308 1309 1310 1311
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1312
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1313
      oldSenders[i] = (pSyncNode->senders)[i];
S
Shengliang Guan 已提交
1314
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1315
    }
1316

M
Minghao Li 已提交
1317 1318
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1319
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1320 1321 1322

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1323 1324
    int32_t j = 0;
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
1325 1326 1327 1328 1329
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
S
Shengliang Guan 已提交
1330
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1331
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1332
    }
1333

M
Minghao Li 已提交
1334 1335
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
1336
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
1337
      syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1338
    }
1339

1340 1341 1342
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1343 1344 1345 1346
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1347

M
Minghao Li 已提交
1348
    // reset snapshot senders
1349

M
Minghao Li 已提交
1350
    // clear new
S
Shengliang Guan 已提交
1351
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1352 1353
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
1354

M
Minghao Li 已提交
1355
    // reset new
S
Shengliang Guan 已提交
1356
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1357 1358
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1359
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1360 1361 1362 1363
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1364
          sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
S
Shengliang Guan 已提交
1365
                  (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
M
Minghao Li 已提交
1366 1367 1368 1369 1370 1371 1372 1373 1374

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

S
Shengliang Guan 已提交
1375 1376
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
                  i, host, port, (pSyncNode->senders)[i], reset);
M
Minghao Li 已提交
1377
        }
1378 1379
      }
    }
1380

M
Minghao Li 已提交
1381
    // create new
S
Shengliang Guan 已提交
1382
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1383 1384
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
S
Shengliang Guan 已提交
1385
        sSTrace((pSyncNode->senders)[i], "snapshot sender create new");
M
Minghao Li 已提交
1386
      }
1387 1388
    }

M
Minghao Li 已提交
1389
    // free old
S
Shengliang Guan 已提交
1390
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1391 1392
      if (oldSenders[i] != NULL) {
        snapshotSenderDestroy(oldSenders[i]);
S
Shengliang Guan 已提交
1393
        sNTrace(pSyncNode, "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1394 1395
        oldSenders[i] = NULL;
      }
1396 1397
    }

1398
    // persist cfg
M
Minghao Li 已提交
1399
    raftCfgPersist(pSyncNode->pRaftCfg);
1400

S
Shengliang Guan 已提交
1401
    char tmpbuf[1024] = {0};
1402
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
S
Shengliang Guan 已提交
1403
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1404

M
Minghao Li 已提交
1405 1406 1407
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
1408 1409 1410 1411 1412

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
      syncMaybeAdvanceCommitIndex(pSyncNode);

M
Minghao Li 已提交
1413 1414 1415 1416
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
1417
    // persist cfg
M
Minghao Li 已提交
1418
    raftCfgPersist(pSyncNode->pRaftCfg);
S
Shengliang Guan 已提交
1419 1420
    sNTrace(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s", oldConfig.replicaNum,
            pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
1421
  }
1422

M
Minghao Li 已提交
1423
_END:
M
Minghao Li 已提交
1424
  // log end config change
S
Shengliang Guan 已提交
1425
  sNTrace(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1426 1427
}

M
Minghao Li 已提交
1428 1429 1430 1431
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1432
    char tmpBuf[64];
1433
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1434
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1435 1436 1437 1438
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1439 1440 1441 1442 1443 1444
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1445
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1446
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1447
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1448
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1449 1450
    return;
  }
M
Minghao Li 已提交
1451 1452

  do {
1453
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1454
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1455 1456 1457 1458 1459
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1460
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1461 1462 1463 1464 1465 1466 1467 1468 1469 1470
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1471 1472
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1473
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1474
  // maybe clear leader cache
M
Minghao Li 已提交
1475 1476 1477 1478
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
1479
  // state change
M
Minghao Li 已提交
1480 1481 1482
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1483 1484
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1485

1486 1487 1488
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1489 1490 1491 1492 1493
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1494 1495 1496
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1497
  // trace log
S
Shengliang Guan 已提交
1498
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1519
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1520 1521
  pSyncNode->leaderTime = taosGetTimestampMs();

1522 1523 1524
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1525
  // state change
M
Minghao Li 已提交
1526
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1527 1528

  // set leader cache
M
Minghao Li 已提交
1529 1530
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1531
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1532 1533
    // maybe overwrite myself, no harm
    // just do it!
1534 1535 1536 1537 1538 1539 1540 1541 1542

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1543 1544
  }

S
Shengliang Guan 已提交
1545
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1546 1547
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1548 1549 1550
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1551 1552 1553
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1554
#if 0
1555 1556
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1557
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1558
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1559 1560 1561
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1562
    }
1563
    (pMySender->privateTerm) += 100;
1564
  }
M
Minghao Li 已提交
1565
#endif
1566

1567 1568 1569 1570 1571
  // close receiver
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1572
  // stop elect timer
M
Minghao Li 已提交
1573
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1574

M
Minghao Li 已提交
1575 1576
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1577

M
Minghao Li 已提交
1578 1579
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1580

1581 1582 1583 1584 1585
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1586 1587 1588
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1589
  // trace log
S
Shengliang Guan 已提交
1590
  sNTrace(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1591 1592 1593
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1594 1595
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1596
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1597

S
Shengliang Guan 已提交
1598
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1599

M
Minghao Li 已提交
1600
  // Raft 3.6.2 Committing entries from previous terms
1601 1602
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1603 1604

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1605
    syncNodeReplicate(pSyncNode);
1606
  }
M
Minghao Li 已提交
1607 1608
}

M
Minghao Li 已提交
1609 1610
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1611
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1612
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1613 1614 1615 1616 1617
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1618 1619 1620
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1621
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1622
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
S
Shengliang Guan 已提交
1623
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1624 1625 1626
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1627
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1628
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
S
Shengliang Guan 已提交
1629
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1630 1631 1632
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1633
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1634
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
S
Shengliang Guan 已提交
1635
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1636 1637 1638
}

// raft vote --------------
M
Minghao Li 已提交
1639 1640 1641

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1642
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
M
Minghao Li 已提交
1643 1644
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1645 1646 1647 1648

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1649
// simulate get vote from outside
M
Minghao Li 已提交
1650
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1651
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1652

S
Shengliang Guan 已提交
1653 1654
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1655
  if (ret != 0) return;
S
Shengliang Guan 已提交
1656 1657

  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1658 1659 1660 1661 1662 1663 1664
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1665
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1666 1667
}

M
Minghao Li 已提交
1668
// return if has a snapshot
M
Minghao Li 已提交
1669 1670
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1671
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1672 1673
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1674 1675 1676 1677 1678 1679 1680
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1681 1682
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1683
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1684
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1685 1686
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1687 1688 1689 1690 1691 1692 1693
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1694 1695
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1696 1697
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1698 1699
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1700
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1701 1702
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1703 1704
    }

M
Minghao Li 已提交
1705 1706 1707
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1708 1709 1710 1711
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1712
  } else {
M
Minghao Li 已提交
1713 1714
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1715
  }
M
Minghao Li 已提交
1716

M
Minghao Li 已提交
1717 1718 1719 1720 1721 1722 1723
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1724 1725
  return 0;
}
M
Minghao Li 已提交
1726

M
Minghao Li 已提交
1727
// return append-entries first try index
M
Minghao Li 已提交
1728 1729 1730 1731 1732
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1733 1734
// if index > 0, return index - 1
// else, return -1
1735 1736 1737 1738 1739 1740 1741 1742 1743
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1744 1745 1746 1747
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

  SyncTerm        preTerm = 0;
  SyncIndex       preIndex = index - 1;
  SSyncRaftEntry* pPreEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
M
Minghao Li 已提交
1761 1762 1763 1764 1765 1766

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

1767 1768 1769 1770 1771 1772
  if (code == 0) {
    ASSERT(pPreEntry != NULL);
    preTerm = pPreEntry->term;
    taosMemoryFree(pPreEntry);
    return preTerm;
  } else {
1773 1774 1775 1776
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
1777 1778 1779 1780
      }
    }
  }

1781
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
1782
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1783 1784
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
1785 1786 1787 1788

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1789
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1790 1791 1792
  return 0;
}

M
Minghao Li 已提交
1793
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804
  if (!syncIsInit()) return;

  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
    int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
      sNError(pNode, "failed to build ping msg");
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
1805
    }
M
Minghao Li 已提交
1806

S
Shengliang Guan 已提交
1807 1808 1809 1810 1811 1812
    sNTrace(pNode, "enqueue ping msg");
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
      sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr());
      rpcFreeCont(rpcMsg.pCont);
      return;
1813 1814
    }

S
Shengliang Guan 已提交
1815
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
M
Minghao Li 已提交
1816
  } else {
1817
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64,
S
Shengliang Guan 已提交
1818
           pNode->pingTimerLogicClock, pNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1819 1820 1821 1822
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845
  if (!syncIsInit()) return;

  SElectTimer* pElectTimer = param;
  SSyncNode*   pNode = pElectTimer->pSyncNode;

  SRpcMsg rpcMsg = {0};
  int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);

  if (code != 0) {
    sNError(pNode, "failed to build elect msg");
    taosMemoryFree(pElectTimer);
    return;
  }

  SyncTimeout* pTimeout = rpcMsg.pCont;
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
    sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr());
    rpcFreeCont(rpcMsg.pCont);
    taosMemoryFree(pElectTimer);
    return;
M
Minghao Li 已提交
1846
  }
M
Minghao Li 已提交
1847 1848

  taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
1849

M
Minghao Li 已提交
1850
#if 0
M
Minghao Li 已提交
1851
  // reset timer ms
S
Shengliang Guan 已提交
1852 1853 1854
  if (syncIsInit() && pNode->electBaseLine > 0) {
    pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine);
    taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer);
M
Minghao Li 已提交
1855 1856
  } else {
    sError("sync env is stop, syncNodeEqElectTimer");
M
Minghao Li 已提交
1857
  }
M
Minghao Li 已提交
1858
#endif
M
Minghao Li 已提交
1859 1860
}

M
Minghao Li 已提交
1861
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1862
  if (!syncIsInit()) return;
1863

S
Shengliang Guan 已提交
1864 1865 1866 1867 1868 1869 1870 1871 1872 1873
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
      int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
        sNError(pNode, "failed to build heartbeat msg");
        return;
1874
      }
M
Minghao Li 已提交
1875

S
Shengliang Guan 已提交
1876 1877 1878 1879 1880 1881
      sNTrace(pNode, "enqueue heartbeat timer");
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
        sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr());
        rpcFreeCont(rpcMsg.pCont);
        return;
1882
      }
S
Shengliang Guan 已提交
1883 1884 1885 1886

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

1887
    } else {
S
Shengliang Guan 已提交
1888 1889
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
1890
    }
M
Minghao Li 已提交
1891 1892 1893
  }
}

1894 1895 1896 1897 1898
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
  SSyncHbTimerData* pData = (SSyncHbTimerData*)param;
  SSyncNode*        pSyncNode = pData->pSyncNode;
  SSyncTimer*       pSyncTimer = pData->pTimer;

M
Minghao Li 已提交
1899 1900 1901 1902
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1903 1904 1905 1906
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return;
  }

M
Minghao Li 已提交
1907 1908 1909 1910
  if (pSyncNode->pRaftStore == NULL) {
    return;
  }

S
Shengliang Guan 已提交
1911
  // sNTrace(pSyncNode, "eq peer hb timer");
1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922

  int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
  int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

  if (pSyncNode->replicaNum > 1) {
    if (timerLogicClock == msgLogicClock) {
      SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
      pSyncMsg->srcId = pSyncNode->myRaftId;
      pSyncMsg->destId = pData->destId;
      pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
      pSyncMsg->commitIndex = pSyncNode->commitIndex;
M
Minghao Li 已提交
1923
      pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
1924 1925 1926 1927 1928 1929 1930
      pSyncMsg->privateTerm = 0;

      SRpcMsg rpcMsg;
      syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);

// eq msg
#if 0
S
Shengliang Guan 已提交
1931 1932
      if (pSyncNode->syncEqCtrlMsg != NULL) {
        int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
1933 1934 1935 1936 1937 1938 1939
        if (code != 0) {
          sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
          rpcFreeCont(rpcMsg.pCont);
          syncHeartbeatDestroy(pSyncMsg);
          return;
        }
      } else {
S
Shengliang Guan 已提交
1940
        sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
1941 1942 1943 1944
      }
#endif

      // send msg
M
Minghao Li 已提交
1945
      syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
1946 1947 1948

      syncHeartbeatDestroy(pSyncMsg);

S
Shengliang Guan 已提交
1949 1950
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
1951 1952 1953 1954 1955 1956
                     &pSyncTimer->pTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }

    } else {
1957
      sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", timerLogicClock,
1958 1959 1960 1961 1962
             msgLogicClock);
    }
  }
}

1963 1964 1965 1966 1967
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
1968

1969 1970 1971 1972
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
1973

S
Shengliang Guan 已提交
1974
  SRpcMsg rpcMsg = {0};
1975
  int32_t code = syncClientRequestBuildFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
M
Minghao Li 已提交
1976
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1977

1978 1979 1980 1981 1982 1983 1984
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
    sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr());
  }

  return code;
M
Minghao Li 已提交
1985 1986
}

1987 1988 1989
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
S
Shengliang Guan 已提交
1990 1991
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
1992 1993 1994 1995 1996 1997 1998 1999 2000
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

M
Minghao Li 已提交
2001 2002 2003
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

2004
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2005
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2006
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2007
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2008

2009 2010 2011
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2012
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2013
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2014
    if (code != 0) {
S
Shengliang Guan 已提交
2015
      sNError(ths, "append noop error");
2016 2017
      return -1;
    }
M
Minghao Li 已提交
2018 2019
  }

2020 2021 2022 2023 2024 2025
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2026 2027 2028
  return ret;
}

2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
  syncLogRecvHeartbeat(ths, pMsg, "");

  SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number

  SRpcMsg rpcMsg;
  syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);

M
Minghao Li 已提交
2041
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2042
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2043
    ths->minMatchIndex = pMsg->minMatchIndex;
2044 2045

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
      SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;

      SRpcMsg rpcMsgLocalCmd;
      syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2060
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, pSyncMsg->fcIndex);
2061 2062
        }
      }
2063 2064 2065
    }
  }

M
Minghao Li 已提交
2066
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2067 2068 2069 2070 2071 2072 2073 2074
    // syncNodeStepDown(ths, pMsg->term);
    SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

    SRpcMsg rpcMsgLocalCmd;
    syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

S
Shengliang Guan 已提交
2075 2076
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2077 2078 2079 2080
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2081
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2082 2083 2084 2085
      }
    }

    syncLocalCmdDestroy(pSyncMsg);
M
Minghao Li 已提交
2086 2087
  }

2088 2089 2090 2091 2092 2093 2094 2095 2096
  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
M
Minghao Li 已提交
2097
  syncHeartbeatReplyDestroy(pMsgReply);
2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110

  return 0;
}

int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
  syncLogRecvHeartbeatReply(ths, pMsg, "");

  // update last reply time, make decision whether the other node is alive or not
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime);

  return 0;
}

M
Minghao Li 已提交
2111
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
2112 2113
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2114 2115 2116
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2117 2118 2119
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2120
  } else {
S
Shengliang Guan 已提交
2121
    sNError(ths, "error local cmd");
M
Minghao Li 已提交
2122 2123 2124 2125 2126
  }

  return 0;
}

M
Minghao Li 已提交
2127 2128 2129 2130 2131 2132 2133 2134 2135 2136
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2137

2138
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2139
  sNTrace(ths, "on client request");
2140

M
Minghao Li 已提交
2141
  int32_t ret = 0;
2142
  int32_t code = 0;
M
Minghao Li 已提交
2143

M
Minghao Li 已提交
2144
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2145
  SyncTerm        term = ths->pRaftStore->currentTerm;
2146 2147 2148 2149 2150 2151 2152
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2153

2154 2155 2156
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2157
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2158 2159 2160
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2161 2162 2163 2164 2165 2166
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2167

2168 2169 2170 2171
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2183
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2184 2185 2186 2187 2188 2189 2190

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }

2191 2192
        return -1;
      }
2193
    }
M
Minghao Li 已提交
2194

2195 2196
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2197
      syncNodeReplicate(ths);
2198
    }
2199

2200 2201
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2202 2203 2204 2205 2206
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2207
    }
M
Minghao Li 已提交
2208 2209
  }

2210 2211 2212 2213 2214 2215 2216 2217
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2218 2219 2220 2221 2222 2223
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2224
  return ret;
2225
}
M
Minghao Li 已提交
2226

S
Shengliang Guan 已提交
2227 2228 2229
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2230
      return "follower";
S
Shengliang Guan 已提交
2231
    case TAOS_SYNC_STATE_CANDIDATE:
2232
      return "candidate";
S
Shengliang Guan 已提交
2233
    case TAOS_SYNC_STATE_LEADER:
2234
      return "leader";
S
Shengliang Guan 已提交
2235
    default:
2236
      return "error";
S
Shengliang Guan 已提交
2237
  }
M
Minghao Li 已提交
2238
}
2239

2240
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2241
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2242
    sNTrace(ths, "I am not follower, can not do leader transfer");
2243 2244
    return 0;
  }
2245 2246

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2247
    sNTrace(ths, "restore not finish, can not do leader transfer");
2248 2249 2250
    return 0;
  }

2251
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2252
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2253 2254 2255 2256
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2257
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2258 2259 2260
    return 0;
  }

2261 2262
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2263
      sNTrace(ths, "I am vnode, can not do leader transfer");
2264 2265 2266 2267
      return 0;
    }
  */

M
Minghao Li 已提交
2268
  SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
S
Shengliang Guan 已提交
2269
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2270

M
Minghao Li 已提交
2271 2272 2273
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2274

M
Minghao Li 已提交
2275 2276
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2277 2278 2279 2280
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
M
Minghao Li 已提交
2281

2282
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2283
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2284 2285
  }

M
Minghao Li 已提交
2286
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2287
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2288 2289 2290 2291 2292 2293 2294 2295 2296
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2297 2298
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2299 2300
  }

M
Minghao Li 已提交
2301
  syncLeaderTransferDestroy(pSyncLeaderTransfer);
2302 2303 2304
  return 0;
}

2305
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2306
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2320 2321 2322 2323
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2324
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2325 2326 2327 2328
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2329 2330 2331 2332 2333 2334 2335 2336 2337
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2338
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2339

M
Minghao Li 已提交
2340 2341 2342
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2343 2344
  }

2345 2346
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2347

S
Shengliang Guan 已提交
2348
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2349 2350 2351 2352 2353 2354

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2355 2356 2357 2358 2359 2360
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        } else {
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
M
Minghao Li 已提交
2361 2362 2363
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2364
            sNError(ths, "get log entry error");
2365
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2366 2367
            continue;
          }
2368
        }
2369

2370
        SRpcMsg rpcMsg = {0};
2371 2372
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2373
        // user commit
2374 2375
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2376
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2377 2378 2379
            internalExecute = false;
          }

S
Shengliang Guan 已提交
2380
          sNTrace(ths, "commit index:%" PRId64 ", internal:%d", i, internalExecute);
2381

2382 2383
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2396
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2397
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
2398
          }
2399 2400
        }

2401 2402
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2403
        // leader transfer
2404 2405 2406
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
2407
        }
2408
#endif
2409 2410

        // restore finish
2411
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
2412 2413 2414 2415 2416 2417
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
2418

2419
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
2420
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
2421 2422 2423 2424
          }
        }

        rpcFreeCont(rpcMsg.pCont);
2425 2426 2427 2428 2429
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2430 2431 2432 2433
      }
    }
  }
  return 0;
2434 2435 2436
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2437
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2438 2439 2440 2441 2442
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2443 2444 2445 2446
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2447
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2448 2449 2450 2451 2452
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2453
}
M
Minghao Li 已提交
2454

2455 2456
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2457
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2458 2459 2460 2461 2462 2463 2464
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2465 2466
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2467
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2468 2469 2470 2471 2472 2473 2474 2475 2476
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2477
  if (pState == NULL) {
2478
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2479 2480
    return false;
  }
M
Minghao Li 已提交
2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2506
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2507
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2508
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2509 2510 2511 2512 2513 2514
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2515 2516
}

2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
  if (timerType == SYNC_TIMEOUT_PING) {
    return "ping";
  } else if (timerType == SYNC_TIMEOUT_ELECTION) {
    return "elect";
  } else if (timerType == SYNC_TIMEOUT_HEARTBEAT) {
    return "heartbeat";
  } else {
    return "unknown";
  }
}

void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
2530
  sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
S
Shengliang Guan 已提交
2531
          syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
2532 2533
}

2534
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
2535 2536 2537
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2538 2539

  sNTrace(pSyncNode,
2540
          "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
S
Shengliang Guan 已提交
2541 2542
          "}, %s",
          host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
2543 2544
}

wafwerar's avatar
wafwerar 已提交
2545
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
2546 2547 2548
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2549 2550

  sNTrace(pSyncNode,
2551
          "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
S
Shengliang Guan 已提交
2552 2553
          "}, %s",
          host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
2554
}
2555 2556 2557 2558 2559

void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2560 2561

  sNTrace(pSyncNode,
2562
          "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
S
Shengliang Guan 已提交
2563 2564
          "}, %s",
          host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
2565 2566 2567 2568 2569 2570
}

void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2571 2572

  sNTrace(pSyncNode,
2573
          "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
S
Shengliang Guan 已提交
2574 2575
          "}, %s",
          host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
2576 2577 2578 2579 2580 2581
}

void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2582

2583
  sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2584
          pMsg->term, pMsg->privateTerm, s);
2585 2586 2587 2588 2589 2590
}

void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2591
  sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2592
          pMsg->term, pMsg->privateTerm, s);
M
Minghao Li 已提交
2593
}
2594 2595

void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
2596 2597
  sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
          syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
M
Minghao Li 已提交
2598 2599 2600 2601 2602 2603
}

void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
2604
  sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
M
Minghao Li 已提交
2605 2606 2607 2608 2609 2610
}

void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2611
  sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
M
Minghao Li 已提交
2612 2613 2614 2615 2616 2617
}

void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
2618
  sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2619
          pMsg->term, pMsg->snapStart, s);
M
Minghao Li 已提交
2620 2621 2622 2623 2624 2625
}

void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2626
  sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host,
S
Shengliang Guan 已提交
2627
          port, pMsg->term, pMsg->snapStart, s);
M
Minghao Li 已提交
2628
}
M
Minghao Li 已提交
2629 2630 2631 2632 2633 2634 2635 2636

void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}

void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}

void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}

void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}