syncMain.c 86.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
M
Minghao Li 已提交
26
#include "syncRaftCfg.h"
M
Minghao Li 已提交
27
#include "syncRaftLog.h"
M
Minghao Li 已提交
28
#include "syncRaftStore.h"
M
Minghao Li 已提交
29
#include "syncReplication.h"
M
Minghao Li 已提交
30 31
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
32
#include "syncRespMgr.h"
M
Minghao Li 已提交
33
#include "syncSnapshot.h"
M
Minghao Li 已提交
34
#include "syncTimeout.h"
M
Minghao Li 已提交
35
#include "syncUtil.h"
M
Minghao Li 已提交
36
#include "syncVoteMgr.h"
M
Minghao Li 已提交
37

M
Minghao Li 已提交
38 39 40 41 42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
43
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
44
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
45 46 47
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
M
Minghao Li 已提交
48

49
int64_t syncOpen(SSyncInfo* pSyncInfo) {
S
Shengliang Guan 已提交
50 51
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
52
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
53 54
    return -1;
  }
M
Minghao Li 已提交
55

S
Shengliang Guan 已提交
56 57 58
  pSyncNode->rid = syncNodeAdd(pSyncNode);
  if (pSyncNode->rid < 0) {
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
59 60 61
    return -1;
  }

S
Shengliang Guan 已提交
62 63 64 65 66 67
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
68
  return pSyncNode->rid;
M
Minghao Li 已提交
69
}
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71
void syncStart(int64_t rid) {
S
Shengliang Guan 已提交
72 73 74 75
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodeStart(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
76 77 78
  }
}

M
Minghao Li 已提交
79
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
80
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
81
  if (pSyncNode != NULL) {
S
Shengliang Guan 已提交
82
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
83
    syncNodeRemove(rid);
M
Minghao Li 已提交
84
  }
S
Shengliang Guan 已提交
85
}
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87 88
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
89 90 91 92
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
M
Minghao Li 已提交
93 94
}

S
Shengliang Guan 已提交
95 96 97
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
98 99
}

S
Shengliang Guan 已提交
100
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
101
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
102
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
103

M
Minghao Li 已提交
104
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
105
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
106
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
107
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
108
    return -1;
M
Minghao Li 已提交
109
  }
110

S
Shengliang Guan 已提交
111 112
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
113

M
Minghao Li 已提交
114 115 116 117
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
118
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
119 120 121 122 123
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
    syncNodeReplicate(pSyncNode);
  }
S
Shengliang Guan 已提交
124

S
Shengliang Guan 已提交
125
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
126
  return 0;
M
Minghao Li 已提交
127
}
M
Minghao Li 已提交
128

S
Shengliang Guan 已提交
129 130 131 132
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
133
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
134 135 136 137 138 139 140 141 142 143 144
  if (pSyncNode == NULL) return code;

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
    SyncHeartbeat* pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
    syncHeartbeatDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
    SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
    syncHeartbeatReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
S
Shengliang Guan 已提交
145
    code = syncNodeOnTimer(pSyncNode, pMsg);
S
Shengliang Guan 已提交
146
  } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
147
    code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
S
Shengliang Guan 已提交
148
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
S
Shengliang Guan 已提交
149
    syncNodeOnRequestVote(pSyncNode, pMsg);
S
Shengliang Guan 已提交
150
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
S
Shengliang Guan 已提交
151
    code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
S
Shengliang Guan 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
    SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
    syncAppendEntriesDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
    SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
    syncSnapshotSendDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
    SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
    syncSnapshotRspDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
    SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
    code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
    syncLocalCmdDestroy(pSyncMsg);
  } else {
    sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
    code = -1;
M
Minghao Li 已提交
175 176
  }

S
Shengliang Guan 已提交
177
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
178
  return code;
179 180
}

S
Shengliang Guan 已提交
181
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
182
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
183
  if (pSyncNode == NULL) return -1;
184

S
Shengliang Guan 已提交
185
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
186
  syncNodeRelease(pSyncNode);
187 188 189
  return ret;
}

M
Minghao Li 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

206
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
207
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
208
  if (pSyncNode == NULL) {
209
    sError("sync begin snapshot error");
210 211
    return -1;
  }
212

213 214
  int32_t code = 0;

M
Minghao Li 已提交
215
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
216 217 218
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
219 220 221
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
222 223 224
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
225 226
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
227
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
228 229 230
      return 0;
    }

M
Minghao Li 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
    goto _DEL_WAL;

  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
248 249 250 251
              sNTrace(pSyncNode,
                      "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                      " of %s:%d, do not delete wal",
                      lastApplyIndex, matchIndex, host, port);
M
Minghao Li 已提交
252 253
            } while (0);

S
Shengliang Guan 已提交
254
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
255 256 257 258 259 260
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
261 262 263
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
264
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
265 266 267 268
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
269
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
270
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
271 272 273
        return 0;

      } else {
S
Shengliang Guan 已提交
274
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
275
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
276 277 278 279 280 281 282 283 284
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
285 286 287
    }
  }

M
Minghao Li 已提交
288
_DEL_WAL:
289

M
Minghao Li 已提交
290
  do {
291 292 293 294
    SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

    if (snapshottingIndex == SYNC_INDEX_INVALID) {
      atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
295
      pSyncNode->snapshottingTime = taosGetTimestampMs();
296

M
Minghao Li 已提交
297 298 299
      SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
      code = walBeginSnapshot(pData->pWal, lastApplyIndex);
      if (code == 0) {
S
Shengliang Guan 已提交
300 301
        sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
302
      } else {
S
Shengliang Guan 已提交
303
        sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
S
Shengliang Guan 已提交
304
                terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
305 306
        atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
      }
307 308

    } else {
S
Shengliang Guan 已提交
309 310
      sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
              snapshottingIndex, lastApplyIndex);
311
    }
M
Minghao Li 已提交
312
  } while (0);
313

S
Shengliang Guan 已提交
314
  syncNodeRelease(pSyncNode);
315 316 317 318
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
319
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
320
  if (pSyncNode == NULL) {
321
    sError("sync end snapshot error");
322 323 324
    return -1;
  }

325 326 327 328
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
329
    if (code != 0) {
330
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
331
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
332 333
      return -1;
    } else {
S
Shengliang Guan 已提交
334
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
335 336
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
337
  }
338

S
Shengliang Guan 已提交
339
  syncNodeRelease(pSyncNode);
340 341 342
  return code;
}

M
Minghao Li 已提交
343
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
344
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
345
  if (pSyncNode == NULL) {
346
    sError("sync step down error");
M
Minghao Li 已提交
347 348 349 350
    return -1;
  }

  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
351
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
352 353 354
  return 0;
}

355 356 357
bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
358
    sError("sync ready for read error");
359 360 361 362 363 364 365 366 367 368
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) {
    syncNodeRelease(pSyncNode);
    return true;
  }

  bool ready = false;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
369 370 371
    if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
      // apply queue not empty
      ready = false;
372

373 374 375 376
    } else {
      if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
        SSyncRaftEntry* pEntry = NULL;
        int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(
S
Shengliang Guan 已提交
377
                    pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
378 379 380 381 382 383 384
        if (code == 0 && pEntry != NULL) {
          if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
            ready = true;
          }

          syncEntryDestory(pEntry);
        }
385 386 387 388
      }
    }
  }

389 390 391 392 393 394 395 396
  if (!ready) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
      terrno = TSDB_CODE_SYN_NOT_LEADER;
    } else {
      terrno = TSDB_CODE_APP_NOT_READY;
    }
  }

397 398 399 400
  syncNodeRelease(pSyncNode);
  return ready;
}

M
Minghao Li 已提交
401 402
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
403
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
404 405 406 407
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

408 409 410 411 412 413
  int32_t ret = 0;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
414 415 416 417 418
  return ret;
}

int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
419
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
420 421 422 423
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

S
Shengliang Guan 已提交
424
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
425

M
Minghao Li 已提交
426 427 428 429 430 431 432 433 434
  SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;
  ASSERT(pMsg != NULL);
  SRpcMsg rpcMsg = {0};
  syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
  syncLeaderTransferDestroy(pMsg);

435
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false);
M
Minghao Li 已提交
436 437 438
  return ret;
}

439 440
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
441

S
Shengliang Guan 已提交
442
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
443 444 445 446
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
447 448
  }

449
  return state;
M
Minghao Li 已提交
450 451
}

452
#if 0
453 454 455 456 457
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
458
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
459 460 461 462 463 464 465 466 467 468 469
  if (pSyncNode == NULL) {
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
    }
S
Shengliang Guan 已提交
470
    syncNodeRelease(pSyncNode);
471 472 473 474 475 476 477 478 479 480
    return -1;
  }
  ASSERT(pEntry != NULL);

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

  syncEntryDestory(pEntry);
S
Shengliang Guan 已提交
481
  syncNodeRelease(pSyncNode);
482 483 484
  return 0;
}

485
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
486
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
487 488 489
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
490
  ASSERT(rid == pSyncNode->rid);
491 492
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
493
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
494

S
Shengliang Guan 已提交
495
  syncNodeRelease(pSyncNode);
496 497 498
  return 0;
}

499
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
500
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
501 502 503
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
504
  ASSERT(rid == pSyncNode->rid);
505 506 507 508

  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
509
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
510 511 512 513 514 515
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
516
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
517
         sMeta->lastConfigIndex);
518

S
Shengliang Guan 已提交
519
  syncNodeRelease(pSyncNode);
520 521
  return 0;
}
522
#endif
523

524 525 526 527
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
528
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
529 530 531 532 533
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
534
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
535
         snapshotLastApplyIndex, lastIndex);
536 537 538 539

  return lastIndex;
}

S
Shengliang Guan 已提交
540
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
M
Minghao Li 已提交
541
  pEpSet->numOfEps = 0;
542

S
Shengliang Guan 已提交
543
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
544
  if (pSyncNode == NULL) return;
545

S
Shengliang Guan 已提交
546
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
547 548 549 550 551
    SEp* pEp = &pEpSet->eps[i];
    tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    pEpSet->numOfEps++;
    sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
552
  }
M
Minghao Li 已提交
553 554 555
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
  }
M
Minghao Li 已提交
556

S
Shengliang Guan 已提交
557
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
558
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
559
}
560

M
Minghao Li 已提交
561
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
562
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
563
  if (pSyncNode == NULL) {
564
    sError("sync propose error");
M
Minghao Li 已提交
565
    return -1;
566
  }
M
Minghao Li 已提交
567

568
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
S
Shengliang Guan 已提交
569
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
570 571 572
  return ret;
}

573
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
574 575 576 577 578
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
    return -1;
  }
M
Minghao Li 已提交
579

S
Shengliang Guan 已提交
580 581 582 583 584 585 586 587 588 589 590
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
591
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
592 593 594
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
595 596 597
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
S
Shengliang Guan 已提交
598 599
    } else {
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
600
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
601
             TMSG_INFO(pMsg->msgType));
602
      return -1;
603
    }
S
Shengliang Guan 已提交
604
  } else {
S
Shengliang Guan 已提交
605 606
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
607 608 609 610 611 612 613
    SRpcMsg   rpcMsg = {0};
    int32_t   code = syncClientRequestBuildFromRpcMsg(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
    }
M
Minghao Li 已提交
614

615 616 617 618 619
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
620
    }
M
Minghao Li 已提交
621

622 623
    return code;
  }
M
Minghao Li 已提交
624 625
}

S
Shengliang Guan 已提交
626
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
627 628 629 630 631 632 633 634 635
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
636
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
637
  int32_t ret = 0;
S
Shengliang Guan 已提交
638
  if (syncIsInit()) {
M
Minghao Li 已提交
639
    SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
640 641 642 643
    pData->pSyncNode = pSyncNode;
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
M
Minghao Li 已提交
644

645
    pSyncTimer->pData = pData;
S
Shengliang Guan 已提交
646
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
647 648 649 650 651 652
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
653
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
654 655 656 657
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
M
Minghao Li 已提交
658
  // taosMemoryFree(pSyncTimer->pData);
659 660 661
  return ret;
}

S
Shengliang Guan 已提交
662 663
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
664 665 666 667
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
668

M
Minghao Li 已提交
669 670 671 672
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
673
      goto _error;
M
Minghao Li 已提交
674
    }
675
  }
M
Minghao Li 已提交
676

S
Shengliang Guan 已提交
677
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
678
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
679
    // create a new raft config file
S
Shengliang Guan 已提交
680
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
681
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
682
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
683
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
684
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
685 686
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
687
      goto _error;
688
    }
689
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
690
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
691 692
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
693 694 695
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
696
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
697
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
698
      goto _error;
699
    }
S
Shengliang Guan 已提交
700 701

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
702 703 704 705 706 707
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
708 709 710 711
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
712 713

    raftCfgClose(pSyncNode->pRaftCfg);
714
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
715 716
  }

S
Shengliang Guan 已提交
717 718
  // init by SSyncInfo
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
719 720 721 722 723 724 725
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
726
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
727 728 729
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
730

M
Minghao Li 已提交
731
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
732
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
733 734 735
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
736

M
Minghao Li 已提交
737 738
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
739
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
740
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
741 742
    goto _error;
  }
M
Minghao Li 已提交
743

M
Minghao Li 已提交
744
  // init internal
M
Minghao Li 已提交
745
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
746
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
747
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
748
    goto _error;
749
  }
M
Minghao Li 已提交
750

M
Minghao Li 已提交
751
  // init peersNum, peers, peersId
M
Minghao Li 已提交
752
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
753 754
  int32_t j = 0;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
755 756
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
757 758 759
      j++;
    }
  }
S
Shengliang Guan 已提交
760
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
761
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
762
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
763
      goto _error;
764
    }
M
Minghao Li 已提交
765
  }
M
Minghao Li 已提交
766

M
Minghao Li 已提交
767
  // init replicaNum, replicasId
M
Minghao Li 已提交
768
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
769
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
770
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
771
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
772
      goto _error;
773
    }
M
Minghao Li 已提交
774 775
  }

M
Minghao Li 已提交
776
  // init raft algorithm
M
Minghao Li 已提交
777
  pSyncNode->pFsm = pSyncInfo->pFsm;
778
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
779
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
780 781
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
782
  // init life cycle outside
M
Minghao Li 已提交
783

M
Minghao Li 已提交
784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
808
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
809
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
810
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
811
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
812
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
813 814
    goto _error;
  }
M
Minghao Li 已提交
815

M
Minghao Li 已提交
816
  // init TLA+ candidate vars
M
Minghao Li 已提交
817
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
818
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
819
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
820 821
    goto _error;
  }
M
Minghao Li 已提交
822
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
823
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
824
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
825 826
    goto _error;
  }
M
Minghao Li 已提交
827

M
Minghao Li 已提交
828 829
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
830
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
831
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
832 833
    goto _error;
  }
M
Minghao Li 已提交
834
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
835
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
836
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
837 838
    goto _error;
  }
M
Minghao Li 已提交
839 840 841

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
842
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
843
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
844 845
    goto _error;
  }
846 847 848 849 850

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
851
    if (code != 0) {
S
Shengliang Guan 已提交
852
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
853
      goto _error;
854
    }
855 856
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
857
      sNTrace(pSyncNode, "reset commit index by snapshot");
858 859 860
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
861

M
Minghao Li 已提交
862 863 864 865 866
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
867
  // init ping timer
M
Minghao Li 已提交
868
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
869
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
870 871
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
872
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
873
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
874

M
Minghao Li 已提交
875 876
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
877
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
878
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
879
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
880 881 882 883
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
884
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
885 886
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
887
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
888 889
  pSyncNode->heartbeatTimerCounter = 0;

890 891 892 893 894
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
895
  // tools
M
Minghao Li 已提交
896
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
897
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
898
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
899 900
    goto _error;
  }
M
Minghao Li 已提交
901

902 903
  // restore state
  pSyncNode->restoreFinish = false;
904

M
Minghao Li 已提交
905
  // snapshot senders
S
Shengliang Guan 已提交
906
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
907 908 909 910 911 912
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
  }

  // snapshot receivers
913
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
914

M
Minghao Li 已提交
915 916 917
  // is config changing
  pSyncNode->changing = false;

M
Minghao Li 已提交
918 919 920
  // peer state
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
921 922 923
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
924
  // start in syncNodeStart
M
Minghao Li 已提交
925
  // start raft
M
Minghao Li 已提交
926
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
927

M
Minghao Li 已提交
928 929
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
930
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
931 932
  pSyncNode->lastReplicateTime = timeNow;

933 934 935
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

S
Shengliang Guan 已提交
936
  sNTrace(pSyncNode, "sync open");
937

M
Minghao Li 已提交
938
  return pSyncNode;
939 940 941

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
942 943
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
944 945 946 947
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
948 949
}

M
Minghao Li 已提交
950 951 952 953 954 955 956 957 958 959 960
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
    ASSERT(code == 0);
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

M
Minghao Li 已提交
961 962
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
963
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
964
    raftStoreNextTerm(pSyncNode->pRaftStore);
965
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
966

967
    // Raft 3.6.2 Committing entries from previous terms
968 969
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
970

M
Minghao Li 已提交
971 972
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
973 974
  }

975 976 977
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
978 979
}

M
Minghao Li 已提交
980 981 982 983 984 985 986 987 988
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
989

990 991 992
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
993 994
}

M
Minghao Li 已提交
995 996 997 998 999 1000 1001 1002
void syncNodePreClose(SSyncNode* pSyncNode) {
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

M
Minghao Li 已提交
1003
void syncNodeClose(SSyncNode* pSyncNode) {
1004 1005 1006
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
1007 1008
  int32_t ret;

S
Shengliang Guan 已提交
1009
  sNTrace(pSyncNode, "sync close");
M
Minghao Li 已提交
1010

M
Minghao Li 已提交
1011
  ret = raftStoreClose(pSyncNode->pRaftStore);
M
Minghao Li 已提交
1012
  ASSERT(ret == 0);
M
Minghao Li 已提交
1013
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
1014

M
Minghao Li 已提交
1015
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1016
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1017
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1018
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1019
  votesRespondDestory(pSyncNode->pVotesRespond);
1020
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1021
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1022
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1023
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1024
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1025
  logStoreDestory(pSyncNode->pLogStore);
1026
  pSyncNode->pLogStore = NULL;
M
Minghao Li 已提交
1027
  raftCfgClose(pSyncNode->pRaftCfg);
1028
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1029 1030 1031 1032 1033

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1034 1035 1036 1037
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

S
Shengliang Guan 已提交
1038
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1039 1040 1041 1042 1043 1044
    if ((pSyncNode->senders)[i] != NULL) {
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1045 1046 1047 1048 1049
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1050
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1051 1052
}

M
Minghao Li 已提交
1053
// option
M
Minghao Li 已提交
1054 1055
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }

M
Minghao Li 已提交
1056
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1057

M
Minghao Li 已提交
1058 1059 1060
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1061 1062
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1063 1064 1065
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1066
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1067
  }
M
Minghao Li 已提交
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1081
  if (syncIsInit()) {
1082
    pSyncNode->electTimerMS = ms;
M
Minghao Li 已提交
1083 1084 1085 1086 1087 1088

    SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
    pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
    pElectTimer->pSyncNode = pSyncNode;
    pElectTimer->pData = NULL;

S
Shengliang Guan 已提交
1089
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager,
1090
                 &pSyncNode->pElectTimer);
1091

1092
  } else {
M
Minghao Li 已提交
1093
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1094
  }
M
Minghao Li 已提交
1095 1096 1097 1098 1099
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1100
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1101 1102
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1103

M
Minghao Li 已提交
1104 1105 1106 1107 1108 1109 1110 1111 1112 1113
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1114 1115
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1116 1117 1118 1119 1120 1121 1122
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1123
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1124

S
Shengliang Guan 已提交
1125 1126
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1127 1128 1129
  return ret;
}

M
Minghao Li 已提交
1130
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1131
  int32_t ret = 0;
S
Shengliang Guan 已提交
1132 1133
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1134 1135 1136
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1137
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1138
  }
1139

S
Shengliang Guan 已提交
1140
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1141 1142 1143
  return ret;
}

M
Minghao Li 已提交
1144
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1145
  int32_t ret = 0;
M
Minghao Li 已提交
1146

1147
#if 0
M
Minghao Li 已提交
1148
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1149 1150
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1151

S
Shengliang Guan 已提交
1152
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1153
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1154 1155 1156
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1157
  }
1158

M
Minghao Li 已提交
1159 1160 1161
  return ret;
}

M
Minghao Li 已提交
1162 1163
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1164 1165

#if 0
M
Minghao Li 已提交
1166 1167 1168
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1169
#endif
1170

S
Shengliang Guan 已提交
1171
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1172
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1173 1174 1175
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1176
  }
1177

M
Minghao Li 已提交
1178 1179 1180
  return ret;
}

1181 1182 1183 1184 1185 1186
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1187 1188 1189
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1190
  syncUtilRaftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1191
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1192 1193 1194
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1195
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1196
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1197
  } else {
M
Minghao Li 已提交
1198 1199
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
    return -1;
M
Minghao Li 已提交
1200
  }
M
Minghao Li 已提交
1201

M
Minghao Li 已提交
1202 1203 1204 1205 1206
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1207
  syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1208
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1209 1210 1211
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1212
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1213
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1214
  } else {
M
Minghao Li 已提交
1215
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1216
  }
M
Minghao Li 已提交
1217 1218 1219
  return 0;
}

1220
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1221 1222 1223
  bool b1 = false;
  bool b2 = false;

S
Shengliang Guan 已提交
1224
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1225 1226 1227 1228 1229 1230 1231
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

S
Shengliang Guan 已提交
1232
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

  ASSERT(b1 == b2);
  return b1;
}

1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1260
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1261
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1262 1263 1264 1265
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1266

1267
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1268 1269
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

M
Minghao Li 已提交
1270 1271
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1272

M
Minghao Li 已提交
1273 1274
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1275

M
Minghao Li 已提交
1276 1277 1278 1279
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1280
  }
1281

M
Minghao Li 已提交
1282 1283 1284 1285 1286
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1287

M
Minghao Li 已提交
1288
  // log begin config change
S
Shengliang Guan 已提交
1289 1290 1291 1292 1293
  char oldCfgStr[1024] = {0};
  char newCfgStr[1024] = {0};
  syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
  syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
  sNTrace(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
M
Minghao Li 已提交
1294

M
Minghao Li 已提交
1295 1296
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1297
  }
M
Minghao Li 已提交
1298 1299
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1300 1301
  }

M
Minghao Li 已提交
1302
  // add last config index
M
Minghao Li 已提交
1303
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1304

M
Minghao Li 已提交
1305 1306 1307 1308 1309 1310 1311 1312 1313
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1314
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1315
      oldSenders[i] = (pSyncNode->senders)[i];
S
Shengliang Guan 已提交
1316
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1317
    }
1318

M
Minghao Li 已提交
1319 1320
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1321
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1322 1323 1324

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1325 1326
    int32_t j = 0;
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
1327 1328 1329 1330 1331
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
S
Shengliang Guan 已提交
1332
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1333
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1334
    }
1335

M
Minghao Li 已提交
1336 1337
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
1338
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
1339
      syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1340
    }
1341

1342 1343 1344
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1345 1346 1347 1348
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1349

M
Minghao Li 已提交
1350
    // reset snapshot senders
1351

M
Minghao Li 已提交
1352
    // clear new
S
Shengliang Guan 已提交
1353
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1354 1355
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
1356

M
Minghao Li 已提交
1357
    // reset new
S
Shengliang Guan 已提交
1358
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1359 1360
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1361
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1362 1363 1364 1365
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1366
          sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
S
Shengliang Guan 已提交
1367
                  (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
M
Minghao Li 已提交
1368 1369 1370 1371 1372 1373 1374 1375 1376

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

S
Shengliang Guan 已提交
1377 1378
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
                  i, host, port, (pSyncNode->senders)[i], reset);
M
Minghao Li 已提交
1379
        }
1380 1381
      }
    }
1382

M
Minghao Li 已提交
1383
    // create new
S
Shengliang Guan 已提交
1384
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1385 1386
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
S
Shengliang Guan 已提交
1387
        sSTrace((pSyncNode->senders)[i], "snapshot sender create new");
M
Minghao Li 已提交
1388
      }
1389 1390
    }

M
Minghao Li 已提交
1391
    // free old
S
Shengliang Guan 已提交
1392
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1393 1394
      if (oldSenders[i] != NULL) {
        snapshotSenderDestroy(oldSenders[i]);
S
Shengliang Guan 已提交
1395
        sNTrace(pSyncNode, "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1396 1397
        oldSenders[i] = NULL;
      }
1398 1399
    }

1400
    // persist cfg
M
Minghao Li 已提交
1401
    raftCfgPersist(pSyncNode->pRaftCfg);
1402

S
Shengliang Guan 已提交
1403
    char tmpbuf[1024] = {0};
1404
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
S
Shengliang Guan 已提交
1405
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1406

M
Minghao Li 已提交
1407 1408 1409
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
1410 1411 1412 1413 1414

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
      syncMaybeAdvanceCommitIndex(pSyncNode);

M
Minghao Li 已提交
1415 1416 1417 1418
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
1419
    // persist cfg
M
Minghao Li 已提交
1420
    raftCfgPersist(pSyncNode->pRaftCfg);
S
Shengliang Guan 已提交
1421 1422
    sNTrace(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s", oldConfig.replicaNum,
            pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
1423
  }
1424

M
Minghao Li 已提交
1425
_END:
M
Minghao Li 已提交
1426
  // log end config change
S
Shengliang Guan 已提交
1427
  sNTrace(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1428 1429
}

M
Minghao Li 已提交
1430 1431 1432 1433
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1434
    char tmpBuf[64];
1435
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1436
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1437 1438 1439 1440
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1441 1442 1443 1444 1445 1446
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1447
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1448
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1449
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1450
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1451 1452
    return;
  }
M
Minghao Li 已提交
1453 1454

  do {
1455
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1456
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1457 1458 1459 1460 1461
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1462
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1463 1464 1465 1466 1467 1468 1469 1470 1471 1472
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1473 1474
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1475
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1476
  // maybe clear leader cache
M
Minghao Li 已提交
1477 1478 1479 1480
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
1481
  // state change
M
Minghao Li 已提交
1482 1483 1484
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1485 1486
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1487

1488 1489 1490
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1491 1492 1493 1494 1495
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1496 1497 1498
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1499
  // trace log
S
Shengliang Guan 已提交
1500
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1521
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1522 1523
  pSyncNode->leaderTime = taosGetTimestampMs();

1524 1525 1526
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1527
  // state change
M
Minghao Li 已提交
1528
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1529 1530

  // set leader cache
M
Minghao Li 已提交
1531 1532
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1533
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1534 1535
    // maybe overwrite myself, no harm
    // just do it!
1536 1537 1538 1539 1540 1541 1542 1543 1544

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1545 1546
  }

S
Shengliang Guan 已提交
1547
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1548 1549
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1550 1551 1552
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1553 1554 1555
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1556
#if 0
1557 1558
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1559
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1560
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1561 1562 1563
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1564
    }
1565
    (pMySender->privateTerm) += 100;
1566
  }
M
Minghao Li 已提交
1567
#endif
1568

1569 1570 1571 1572 1573
  // close receiver
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1574
  // stop elect timer
M
Minghao Li 已提交
1575
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1576

M
Minghao Li 已提交
1577 1578
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1579

M
Minghao Li 已提交
1580 1581
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1582

1583 1584 1585 1586 1587
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1588 1589 1590
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1591
  // trace log
S
Shengliang Guan 已提交
1592
  sNTrace(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1593 1594 1595
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1596 1597
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1598
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1599

S
Shengliang Guan 已提交
1600
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1601

M
Minghao Li 已提交
1602
  // Raft 3.6.2 Committing entries from previous terms
1603 1604
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1605 1606

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1607
    syncNodeReplicate(pSyncNode);
1608
  }
M
Minghao Li 已提交
1609 1610
}

M
Minghao Li 已提交
1611 1612
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1613
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1614
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1615 1616 1617 1618 1619
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1620 1621 1622
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1623
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1624
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
S
Shengliang Guan 已提交
1625
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1626 1627 1628
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1629
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1630
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
S
Shengliang Guan 已提交
1631
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1632 1633 1634
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1635
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1636
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
S
Shengliang Guan 已提交
1637
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1638 1639 1640
}

// raft vote --------------
M
Minghao Li 已提交
1641 1642 1643

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1644
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
M
Minghao Li 已提交
1645 1646
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1647 1648 1649 1650

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1651
// simulate get vote from outside
M
Minghao Li 已提交
1652
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1653
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1654

S
Shengliang Guan 已提交
1655 1656
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1657
  if (ret != 0) return;
S
Shengliang Guan 已提交
1658 1659

  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1660 1661 1662 1663 1664 1665 1666
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1667
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1668 1669
}

M
Minghao Li 已提交
1670
// return if has a snapshot
M
Minghao Li 已提交
1671 1672
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1673
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1674 1675
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1676 1677 1678 1679 1680 1681 1682
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1683 1684
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1685
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1686
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1687 1688
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1689 1690 1691 1692 1693 1694 1695
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1696 1697
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1698 1699
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1700 1701
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1702
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1703 1704
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1705 1706
    }

M
Minghao Li 已提交
1707 1708 1709
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1710 1711 1712 1713
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1714
  } else {
M
Minghao Li 已提交
1715 1716
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1717
  }
M
Minghao Li 已提交
1718

M
Minghao Li 已提交
1719 1720 1721 1722 1723 1724 1725
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1726 1727
  return 0;
}
M
Minghao Li 已提交
1728

M
Minghao Li 已提交
1729
// return append-entries first try index
M
Minghao Li 已提交
1730 1731 1732 1733 1734
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1735 1736
// if index > 0, return index - 1
// else, return -1
1737 1738 1739 1740 1741 1742 1743 1744 1745
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1746 1747 1748 1749
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

  SyncTerm        preTerm = 0;
  SyncIndex       preIndex = index - 1;
  SSyncRaftEntry* pPreEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
M
Minghao Li 已提交
1763 1764 1765 1766 1767 1768

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

1769 1770 1771 1772 1773 1774
  if (code == 0) {
    ASSERT(pPreEntry != NULL);
    preTerm = pPreEntry->term;
    taosMemoryFree(pPreEntry);
    return preTerm;
  } else {
1775 1776 1777 1778
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
1779 1780 1781 1782
      }
    }
  }

1783
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
1784
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1785 1786
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
1787 1788 1789 1790

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1791
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1792 1793 1794
  return 0;
}

M
Minghao Li 已提交
1795
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806
  if (!syncIsInit()) return;

  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
    int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
      sNError(pNode, "failed to build ping msg");
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
1807
    }
M
Minghao Li 已提交
1808

S
Shengliang Guan 已提交
1809 1810 1811 1812 1813 1814
    sNTrace(pNode, "enqueue ping msg");
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
      sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr());
      rpcFreeCont(rpcMsg.pCont);
      return;
1815 1816
    }

S
Shengliang Guan 已提交
1817
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
M
Minghao Li 已提交
1818
  } else {
1819
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64,
S
Shengliang Guan 已提交
1820
           pNode->pingTimerLogicClock, pNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1821 1822 1823 1824
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847
  if (!syncIsInit()) return;

  SElectTimer* pElectTimer = param;
  SSyncNode*   pNode = pElectTimer->pSyncNode;

  SRpcMsg rpcMsg = {0};
  int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);

  if (code != 0) {
    sNError(pNode, "failed to build elect msg");
    taosMemoryFree(pElectTimer);
    return;
  }

  SyncTimeout* pTimeout = rpcMsg.pCont;
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
    sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr());
    rpcFreeCont(rpcMsg.pCont);
    taosMemoryFree(pElectTimer);
    return;
M
Minghao Li 已提交
1848
  }
M
Minghao Li 已提交
1849 1850

  taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
1851

M
Minghao Li 已提交
1852
#if 0
M
Minghao Li 已提交
1853
  // reset timer ms
S
Shengliang Guan 已提交
1854 1855 1856
  if (syncIsInit() && pNode->electBaseLine > 0) {
    pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine);
    taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer);
M
Minghao Li 已提交
1857 1858
  } else {
    sError("sync env is stop, syncNodeEqElectTimer");
M
Minghao Li 已提交
1859
  }
M
Minghao Li 已提交
1860
#endif
M
Minghao Li 已提交
1861 1862
}

M
Minghao Li 已提交
1863
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1864
  if (!syncIsInit()) return;
1865

S
Shengliang Guan 已提交
1866 1867 1868 1869 1870 1871 1872 1873 1874 1875
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
      int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
        sNError(pNode, "failed to build heartbeat msg");
        return;
1876
      }
M
Minghao Li 已提交
1877

S
Shengliang Guan 已提交
1878 1879 1880 1881 1882 1883
      sNTrace(pNode, "enqueue heartbeat timer");
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
        sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr());
        rpcFreeCont(rpcMsg.pCont);
        return;
1884
      }
S
Shengliang Guan 已提交
1885 1886 1887 1888

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

1889
    } else {
S
Shengliang Guan 已提交
1890 1891
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
1892
    }
M
Minghao Li 已提交
1893 1894 1895
  }
}

1896 1897 1898 1899 1900
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
  SSyncHbTimerData* pData = (SSyncHbTimerData*)param;
  SSyncNode*        pSyncNode = pData->pSyncNode;
  SSyncTimer*       pSyncTimer = pData->pTimer;

M
Minghao Li 已提交
1901 1902 1903 1904
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1905 1906 1907 1908
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return;
  }

M
Minghao Li 已提交
1909 1910 1911 1912
  if (pSyncNode->pRaftStore == NULL) {
    return;
  }

S
Shengliang Guan 已提交
1913
  // sNTrace(pSyncNode, "eq peer hb timer");
1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924

  int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
  int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

  if (pSyncNode->replicaNum > 1) {
    if (timerLogicClock == msgLogicClock) {
      SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
      pSyncMsg->srcId = pSyncNode->myRaftId;
      pSyncMsg->destId = pData->destId;
      pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
      pSyncMsg->commitIndex = pSyncNode->commitIndex;
M
Minghao Li 已提交
1925
      pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
1926 1927 1928 1929 1930 1931 1932
      pSyncMsg->privateTerm = 0;

      SRpcMsg rpcMsg;
      syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);

// eq msg
#if 0
S
Shengliang Guan 已提交
1933 1934
      if (pSyncNode->syncEqCtrlMsg != NULL) {
        int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
1935 1936 1937 1938 1939 1940 1941
        if (code != 0) {
          sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
          rpcFreeCont(rpcMsg.pCont);
          syncHeartbeatDestroy(pSyncMsg);
          return;
        }
      } else {
S
Shengliang Guan 已提交
1942
        sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
1943 1944 1945 1946
      }
#endif

      // send msg
M
Minghao Li 已提交
1947
      syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
1948 1949 1950

      syncHeartbeatDestroy(pSyncMsg);

S
Shengliang Guan 已提交
1951 1952
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
1953 1954 1955 1956 1957 1958
                     &pSyncTimer->pTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }

    } else {
1959
      sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", timerLogicClock,
1960 1961 1962 1963 1964
             msgLogicClock);
    }
  }
}

1965 1966 1967 1968 1969
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
1970

1971 1972 1973 1974
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
1975

S
Shengliang Guan 已提交
1976
  SRpcMsg rpcMsg = {0};
1977
  int32_t code = syncClientRequestBuildFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
M
Minghao Li 已提交
1978
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1979

1980 1981 1982 1983 1984 1985 1986
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
    sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr());
  }

  return code;
M
Minghao Li 已提交
1987 1988
}

1989 1990 1991
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
S
Shengliang Guan 已提交
1992 1993
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
1994 1995 1996 1997 1998 1999 2000 2001 2002
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

M
Minghao Li 已提交
2003 2004 2005
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

2006
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2007
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2008
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2009
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2010

2011 2012 2013
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2014
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2015
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2016
    if (code != 0) {
S
Shengliang Guan 已提交
2017
      sNError(ths, "append noop error");
2018 2019
      return -1;
    }
M
Minghao Li 已提交
2020 2021
  }

2022 2023 2024 2025 2026 2027
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2028 2029 2030
  return ret;
}

2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
  syncLogRecvHeartbeat(ths, pMsg, "");

  SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number

  SRpcMsg rpcMsg;
  syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);

M
Minghao Li 已提交
2043
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2044
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2045
    ths->minMatchIndex = pMsg->minMatchIndex;
2046 2047

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
      SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;

      SRpcMsg rpcMsgLocalCmd;
      syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2062
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, pSyncMsg->fcIndex);
2063 2064
        }
      }
2065 2066 2067
    }
  }

M
Minghao Li 已提交
2068
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2069 2070 2071 2072 2073 2074 2075 2076
    // syncNodeStepDown(ths, pMsg->term);
    SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

    SRpcMsg rpcMsgLocalCmd;
    syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

S
Shengliang Guan 已提交
2077 2078
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2079 2080 2081 2082
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2083
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2084 2085 2086 2087
      }
    }

    syncLocalCmdDestroy(pSyncMsg);
M
Minghao Li 已提交
2088 2089
  }

2090 2091 2092 2093 2094 2095 2096 2097 2098
  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
M
Minghao Li 已提交
2099
  syncHeartbeatReplyDestroy(pMsgReply);
2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112

  return 0;
}

int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
  syncLogRecvHeartbeatReply(ths, pMsg, "");

  // update last reply time, make decision whether the other node is alive or not
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime);

  return 0;
}

M
Minghao Li 已提交
2113
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
2114 2115
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2116 2117 2118
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2119 2120 2121
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2122
  } else {
S
Shengliang Guan 已提交
2123
    sNError(ths, "error local cmd");
M
Minghao Li 已提交
2124 2125 2126 2127 2128
  }

  return 0;
}

M
Minghao Li 已提交
2129 2130 2131 2132 2133 2134 2135 2136 2137 2138
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2139

2140
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2141
  sNTrace(ths, "on client request");
2142

M
Minghao Li 已提交
2143
  int32_t ret = 0;
2144
  int32_t code = 0;
M
Minghao Li 已提交
2145

M
Minghao Li 已提交
2146
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2147
  SyncTerm        term = ths->pRaftStore->currentTerm;
2148 2149 2150 2151 2152 2153 2154
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2155

2156 2157 2158
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2159
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2160 2161 2162
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2163 2164 2165 2166 2167 2168
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2169

2170 2171 2172 2173
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2185
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2186 2187 2188 2189 2190 2191 2192

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }

2193 2194
        return -1;
      }
2195
    }
M
Minghao Li 已提交
2196

2197 2198
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2199
      syncNodeReplicate(ths);
2200
    }
2201

2202 2203
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2204 2205 2206 2207 2208
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2209
    }
M
Minghao Li 已提交
2210 2211
  }

2212 2213 2214 2215 2216 2217 2218 2219
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2220 2221 2222 2223 2224 2225
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2226
  return ret;
2227
}
M
Minghao Li 已提交
2228

S
Shengliang Guan 已提交
2229 2230 2231
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2232
      return "follower";
S
Shengliang Guan 已提交
2233
    case TAOS_SYNC_STATE_CANDIDATE:
2234
      return "candidate";
S
Shengliang Guan 已提交
2235
    case TAOS_SYNC_STATE_LEADER:
2236
      return "leader";
S
Shengliang Guan 已提交
2237
    default:
2238
      return "error";
S
Shengliang Guan 已提交
2239
  }
M
Minghao Li 已提交
2240
}
2241

2242
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2243
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2244
    sNTrace(ths, "I am not follower, can not do leader transfer");
2245 2246
    return 0;
  }
2247 2248

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2249
    sNTrace(ths, "restore not finish, can not do leader transfer");
2250 2251 2252
    return 0;
  }

2253
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2254
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2255 2256 2257 2258
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2259
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2260 2261 2262
    return 0;
  }

2263 2264
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2265
      sNTrace(ths, "I am vnode, can not do leader transfer");
2266 2267 2268 2269
      return 0;
    }
  */

M
Minghao Li 已提交
2270
  SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
S
Shengliang Guan 已提交
2271
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2272

M
Minghao Li 已提交
2273 2274 2275
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2276

M
Minghao Li 已提交
2277 2278
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2279 2280 2281 2282
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
M
Minghao Li 已提交
2283

2284
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2285
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2286 2287
  }

M
Minghao Li 已提交
2288
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2289
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2290 2291 2292 2293 2294 2295 2296 2297 2298
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2299 2300
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2301 2302
  }

M
Minghao Li 已提交
2303
  syncLeaderTransferDestroy(pSyncLeaderTransfer);
2304 2305 2306
  return 0;
}

2307
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2308
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2322 2323 2324 2325
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2326
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2327 2328 2329 2330
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2331 2332 2333 2334 2335 2336 2337 2338 2339
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2340
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2341

M
Minghao Li 已提交
2342 2343 2344
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2345 2346
  }

2347 2348
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2349

S
Shengliang Guan 已提交
2350
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2351 2352 2353 2354 2355 2356

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2357 2358 2359 2360 2361 2362
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        } else {
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
M
Minghao Li 已提交
2363 2364 2365
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2366
            sNError(ths, "get log entry error");
2367
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2368 2369
            continue;
          }
2370
        }
2371

2372
        SRpcMsg rpcMsg = {0};
2373 2374
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2375
        // user commit
2376 2377
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2378
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2379 2380 2381
            internalExecute = false;
          }

S
Shengliang Guan 已提交
2382
          sNTrace(ths, "commit index:%" PRId64 ", internal:%d", i, internalExecute);
2383

2384 2385
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2398
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2399
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
2400
          }
2401 2402
        }

2403 2404
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2405
        // leader transfer
2406 2407 2408
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
2409
        }
2410
#endif
2411 2412

        // restore finish
2413
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
2414 2415 2416 2417 2418 2419
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
2420

2421
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
2422
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
2423 2424 2425 2426
          }
        }

        rpcFreeCont(rpcMsg.pCont);
2427 2428 2429 2430 2431
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2432 2433 2434 2435
      }
    }
  }
  return 0;
2436 2437 2438
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2439
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2440 2441 2442 2443 2444
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2445 2446 2447 2448
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2449
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2450 2451 2452 2453 2454
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2455
}
M
Minghao Li 已提交
2456

2457 2458
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2459
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2460 2461 2462 2463 2464 2465 2466
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2467 2468
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2469
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2470 2471 2472 2473 2474 2475 2476 2477 2478
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2479
  if (pState == NULL) {
2480
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2481 2482
    return false;
  }
M
Minghao Li 已提交
2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2508
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2509
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2510
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2511 2512 2513 2514 2515 2516
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2517 2518
}

2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
  if (timerType == SYNC_TIMEOUT_PING) {
    return "ping";
  } else if (timerType == SYNC_TIMEOUT_ELECTION) {
    return "elect";
  } else if (timerType == SYNC_TIMEOUT_HEARTBEAT) {
    return "heartbeat";
  } else {
    return "unknown";
  }
}

void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
2532
  sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
S
Shengliang Guan 已提交
2533
          syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
2534 2535
}

wafwerar's avatar
wafwerar 已提交
2536
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
M
Minghao Li 已提交
2537 2538 2539
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2540
  sNTrace(pSyncNode,
2541 2542
          "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
          ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
S
Shengliang Guan 已提交
2543 2544
          host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
          pMsg->dataLen, s);
M
Minghao Li 已提交
2545 2546
}

wafwerar's avatar
wafwerar 已提交
2547
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
M
Minghao Li 已提交
2548 2549 2550
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2551 2552

  sNTrace(pSyncNode,
2553 2554
          "recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
          ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s",
S
Shengliang Guan 已提交
2555 2556
          host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
          pMsg->dataLen, s);
M
Minghao Li 已提交
2557 2558
}

2559
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
2560 2561 2562
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2563 2564

  sNTrace(pSyncNode,
2565
          "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
S
Shengliang Guan 已提交
2566 2567
          "}, %s",
          host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
2568 2569
}

wafwerar's avatar
wafwerar 已提交
2570
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
2571 2572 2573
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2574 2575

  sNTrace(pSyncNode,
2576
          "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
S
Shengliang Guan 已提交
2577 2578
          "}, %s",
          host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
2579
}
2580 2581 2582 2583 2584

void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2585 2586

  sNTrace(pSyncNode,
2587
          "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
S
Shengliang Guan 已提交
2588 2589
          "}, %s",
          host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
2590 2591 2592 2593 2594 2595
}

void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2596 2597

  sNTrace(pSyncNode,
2598
          "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
S
Shengliang Guan 已提交
2599 2600
          "}, %s",
          host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
2601 2602 2603 2604 2605 2606
}

void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2607

2608
  sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2609
          pMsg->term, pMsg->privateTerm, s);
2610 2611 2612 2613 2614 2615
}

void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2616
  sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2617
          pMsg->term, pMsg->privateTerm, s);
M
Minghao Li 已提交
2618
}
2619 2620

void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
2621 2622
  sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
          syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
M
Minghao Li 已提交
2623 2624 2625 2626 2627 2628
}

void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
2629
  sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
M
Minghao Li 已提交
2630 2631 2632 2633 2634 2635
}

void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2636
  sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
M
Minghao Li 已提交
2637 2638 2639 2640 2641 2642
}

void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
2643
  sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2644
          pMsg->term, pMsg->snapStart, s);
M
Minghao Li 已提交
2645 2646 2647 2648 2649 2650
}

void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2651
  sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host,
S
Shengliang Guan 已提交
2652
          port, pMsg->term, pMsg->snapStart, s);
M
Minghao Li 已提交
2653
}
M
Minghao Li 已提交
2654 2655 2656 2657 2658 2659 2660 2661

void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}

void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}

void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}

void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}