syncMain.c 85.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
M
Minghao Li 已提交
26
#include "syncRaftCfg.h"
M
Minghao Li 已提交
27
#include "syncRaftLog.h"
M
Minghao Li 已提交
28
#include "syncRaftStore.h"
M
Minghao Li 已提交
29
#include "syncReplication.h"
M
Minghao Li 已提交
30 31
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
32
#include "syncRespMgr.h"
M
Minghao Li 已提交
33
#include "syncSnapshot.h"
M
Minghao Li 已提交
34
#include "syncTimeout.h"
M
Minghao Li 已提交
35
#include "syncUtil.h"
M
Minghao Li 已提交
36
#include "syncVoteMgr.h"
M
Minghao Li 已提交
37

M
Minghao Li 已提交
38 39 40 41 42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
43
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
44
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
45 46 47
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
M
Minghao Li 已提交
48

49
int64_t syncOpen(SSyncInfo* pSyncInfo) {
S
Shengliang Guan 已提交
50 51
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
52
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
53 54
    return -1;
  }
M
Minghao Li 已提交
55

S
Shengliang Guan 已提交
56 57 58
  pSyncNode->rid = syncNodeAdd(pSyncNode);
  if (pSyncNode->rid < 0) {
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
59 60 61
    return -1;
  }

S
Shengliang Guan 已提交
62 63 64 65 66 67
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
68
  return pSyncNode->rid;
M
Minghao Li 已提交
69
}
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71
void syncStart(int64_t rid) {
S
Shengliang Guan 已提交
72 73 74 75
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodeStart(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
76 77 78
  }
}

M
Minghao Li 已提交
79
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
80
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
81
  if (pSyncNode != NULL) {
S
Shengliang Guan 已提交
82
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
83
    syncNodeRemove(rid);
M
Minghao Li 已提交
84
  }
S
Shengliang Guan 已提交
85
}
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87 88
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
89 90 91 92
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
M
Minghao Li 已提交
93 94
}

S
Shengliang Guan 已提交
95 96 97
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
98 99
}

S
Shengliang Guan 已提交
100
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
101
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
102
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
103

M
Minghao Li 已提交
104
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
105
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
106
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
107
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
108
    return -1;
M
Minghao Li 已提交
109
  }
110

S
Shengliang Guan 已提交
111 112
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
113

M
Minghao Li 已提交
114 115 116 117
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
118
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
119 120 121 122 123
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
    syncNodeReplicate(pSyncNode);
  }
S
Shengliang Guan 已提交
124

S
Shengliang Guan 已提交
125
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
126
  return 0;
M
Minghao Li 已提交
127
}
M
Minghao Li 已提交
128

S
Shengliang Guan 已提交
129 130 131 132
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
133
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
134 135 136 137 138 139 140 141 142 143 144
  if (pSyncNode == NULL) return code;

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
    SyncHeartbeat* pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
    syncHeartbeatDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
    SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
    syncHeartbeatReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
S
Shengliang Guan 已提交
145
    code = syncNodeOnTimer(pSyncNode, pMsg);
S
Shengliang Guan 已提交
146
  } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
147
    code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
S
Shengliang Guan 已提交
148
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
S
Shengliang Guan 已提交
149
    syncNodeOnRequestVote(pSyncNode, pMsg);
S
Shengliang Guan 已提交
150
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
S
Shengliang Guan 已提交
151
    code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
S
Shengliang Guan 已提交
152
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
153
    syncNodeOnAppendEntries(pSyncNode, pMsg);
S
Shengliang Guan 已提交
154
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
155
    syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
S
Shengliang Guan 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
    SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
    syncSnapshotSendDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
    SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
    syncSnapshotRspDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
    SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
    code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
    syncLocalCmdDestroy(pSyncMsg);
  } else {
    sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
    code = -1;
M
Minghao Li 已提交
171 172
  }

S
Shengliang Guan 已提交
173
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
174
  return code;
175 176
}

S
Shengliang Guan 已提交
177
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
178
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
179
  if (pSyncNode == NULL) return -1;
180

S
Shengliang Guan 已提交
181
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
182
  syncNodeRelease(pSyncNode);
183 184 185
  return ret;
}

M
Minghao Li 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

202
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
203
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
204
  if (pSyncNode == NULL) {
205
    sError("sync begin snapshot error");
206 207
    return -1;
  }
208

209 210
  int32_t code = 0;

M
Minghao Li 已提交
211
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
212 213 214
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
215 216 217
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
218 219 220
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
221 222
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
223
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
224 225 226
      return 0;
    }

M
Minghao Li 已提交
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
    goto _DEL_WAL;

  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
244 245 246 247
              sNTrace(pSyncNode,
                      "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                      " of %s:%d, do not delete wal",
                      lastApplyIndex, matchIndex, host, port);
M
Minghao Li 已提交
248 249
            } while (0);

S
Shengliang Guan 已提交
250
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
251 252 253 254 255 256
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
257 258 259
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
260
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
261 262 263 264
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
265
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
266
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
267 268 269
        return 0;

      } else {
S
Shengliang Guan 已提交
270
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
271
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
272 273 274 275 276 277 278 279 280
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
281 282 283
    }
  }

M
Minghao Li 已提交
284
_DEL_WAL:
285

M
Minghao Li 已提交
286
  do {
287 288 289 290
    SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

    if (snapshottingIndex == SYNC_INDEX_INVALID) {
      atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
291
      pSyncNode->snapshottingTime = taosGetTimestampMs();
292

M
Minghao Li 已提交
293 294 295
      SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
      code = walBeginSnapshot(pData->pWal, lastApplyIndex);
      if (code == 0) {
S
Shengliang Guan 已提交
296 297
        sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
298
      } else {
S
Shengliang Guan 已提交
299
        sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
S
Shengliang Guan 已提交
300
                terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
301 302
        atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
      }
303 304

    } else {
S
Shengliang Guan 已提交
305 306
      sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
              snapshottingIndex, lastApplyIndex);
307
    }
M
Minghao Li 已提交
308
  } while (0);
309

S
Shengliang Guan 已提交
310
  syncNodeRelease(pSyncNode);
311 312 313 314
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
315
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
316
  if (pSyncNode == NULL) {
317
    sError("sync end snapshot error");
318 319 320
    return -1;
  }

321 322 323 324
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
325
    if (code != 0) {
326
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
327
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
328 329
      return -1;
    } else {
S
Shengliang Guan 已提交
330
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
331 332
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
333
  }
334

S
Shengliang Guan 已提交
335
  syncNodeRelease(pSyncNode);
336 337 338
  return code;
}

M
Minghao Li 已提交
339
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
340
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
341
  if (pSyncNode == NULL) {
342
    sError("sync step down error");
M
Minghao Li 已提交
343 344 345 346
    return -1;
  }

  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
347
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
348 349 350
  return 0;
}

351 352 353
bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
354
    sError("sync ready for read error");
355 356 357 358 359 360 361 362 363 364
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) {
    syncNodeRelease(pSyncNode);
    return true;
  }

  bool ready = false;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
365 366 367
    if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
      // apply queue not empty
      ready = false;
368

369 370 371 372
    } else {
      if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
        SSyncRaftEntry* pEntry = NULL;
        int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(
S
Shengliang Guan 已提交
373
                    pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
374 375 376 377 378 379 380
        if (code == 0 && pEntry != NULL) {
          if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
            ready = true;
          }

          syncEntryDestory(pEntry);
        }
381 382 383 384
      }
    }
  }

385 386 387 388 389 390 391 392
  if (!ready) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
      terrno = TSDB_CODE_SYN_NOT_LEADER;
    } else {
      terrno = TSDB_CODE_APP_NOT_READY;
    }
  }

393 394 395 396
  syncNodeRelease(pSyncNode);
  return ready;
}

M
Minghao Li 已提交
397 398
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
399
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
400 401 402 403
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

404 405 406 407 408 409
  int32_t ret = 0;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
410 411 412 413 414
  return ret;
}

int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
415
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
416 417 418 419
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

S
Shengliang Guan 已提交
420
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
421

M
Minghao Li 已提交
422 423 424 425 426 427 428 429 430
  SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;
  ASSERT(pMsg != NULL);
  SRpcMsg rpcMsg = {0};
  syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
  syncLeaderTransferDestroy(pMsg);

431
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false);
M
Minghao Li 已提交
432 433 434
  return ret;
}

435 436
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
437

S
Shengliang Guan 已提交
438
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
439 440 441 442
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
443 444
  }

445
  return state;
M
Minghao Li 已提交
446 447
}

448
#if 0
449 450 451 452 453
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
454
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
455 456 457 458 459 460 461 462 463 464 465
  if (pSyncNode == NULL) {
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
    }
S
Shengliang Guan 已提交
466
    syncNodeRelease(pSyncNode);
467 468 469 470 471 472 473 474 475 476
    return -1;
  }
  ASSERT(pEntry != NULL);

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

  syncEntryDestory(pEntry);
S
Shengliang Guan 已提交
477
  syncNodeRelease(pSyncNode);
478 479 480
  return 0;
}

481
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
482
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
483 484 485
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
486
  ASSERT(rid == pSyncNode->rid);
487 488
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
489
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
490

S
Shengliang Guan 已提交
491
  syncNodeRelease(pSyncNode);
492 493 494
  return 0;
}

495
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
496
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
497 498 499
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
500
  ASSERT(rid == pSyncNode->rid);
501 502 503 504

  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
505
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
506 507 508 509 510 511
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
512
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
513
         sMeta->lastConfigIndex);
514

S
Shengliang Guan 已提交
515
  syncNodeRelease(pSyncNode);
516 517
  return 0;
}
518
#endif
519

520 521 522 523
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
524
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
525 526 527 528 529
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
530
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
531
         snapshotLastApplyIndex, lastIndex);
532 533 534 535

  return lastIndex;
}

S
Shengliang Guan 已提交
536
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
M
Minghao Li 已提交
537
  pEpSet->numOfEps = 0;
538

S
Shengliang Guan 已提交
539
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
540
  if (pSyncNode == NULL) return;
541

S
Shengliang Guan 已提交
542
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
543 544 545 546 547
    SEp* pEp = &pEpSet->eps[i];
    tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    pEpSet->numOfEps++;
    sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
548
  }
M
Minghao Li 已提交
549 550 551
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
  }
M
Minghao Li 已提交
552

S
Shengliang Guan 已提交
553
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
554
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
555
}
556

M
Minghao Li 已提交
557
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
558
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
559
  if (pSyncNode == NULL) {
560
    sError("sync propose error");
M
Minghao Li 已提交
561
    return -1;
562
  }
M
Minghao Li 已提交
563

564
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
S
Shengliang Guan 已提交
565
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
566 567 568
  return ret;
}

569
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
570 571 572 573 574
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
    return -1;
  }
M
Minghao Li 已提交
575

S
Shengliang Guan 已提交
576 577 578 579 580 581 582 583 584 585 586
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
587
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
588 589 590
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
591 592 593
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
S
Shengliang Guan 已提交
594 595
    } else {
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
596
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
597
             TMSG_INFO(pMsg->msgType));
598
      return -1;
599
    }
S
Shengliang Guan 已提交
600
  } else {
S
Shengliang Guan 已提交
601 602
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
603
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
604
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
605 606 607 608 609
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
    }
M
Minghao Li 已提交
610

611 612 613 614 615
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
616
    }
M
Minghao Li 已提交
617

618 619
    return code;
  }
M
Minghao Li 已提交
620 621
}

S
Shengliang Guan 已提交
622
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
623 624 625 626 627 628 629 630 631
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
632
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
633
  int32_t ret = 0;
S
Shengliang Guan 已提交
634
  if (syncIsInit()) {
M
Minghao Li 已提交
635
    SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
636 637 638 639
    pData->pSyncNode = pSyncNode;
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
M
Minghao Li 已提交
640

641
    pSyncTimer->pData = pData;
S
Shengliang Guan 已提交
642
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
643 644 645 646 647 648
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
649
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
650 651 652 653
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
M
Minghao Li 已提交
654
  // taosMemoryFree(pSyncTimer->pData);
655 656 657
  return ret;
}

S
Shengliang Guan 已提交
658 659
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
660 661 662 663
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
664

M
Minghao Li 已提交
665 666 667 668
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
669
      goto _error;
M
Minghao Li 已提交
670
    }
671
  }
M
Minghao Li 已提交
672

S
Shengliang Guan 已提交
673
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
674
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
675
    // create a new raft config file
S
Shengliang Guan 已提交
676
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
677
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
678
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
679
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
680
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
681 682
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
683
      goto _error;
684
    }
685
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
686
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
687 688
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
689 690 691
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
692
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
693
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
694
      goto _error;
695
    }
S
Shengliang Guan 已提交
696 697

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
698 699 700 701 702 703
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
704 705 706 707
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
708 709

    raftCfgClose(pSyncNode->pRaftCfg);
710
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
711 712
  }

S
Shengliang Guan 已提交
713 714
  // init by SSyncInfo
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
715 716 717 718 719 720 721
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
722
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
723 724 725
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
726

M
Minghao Li 已提交
727
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
728
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
729 730 731
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
732

M
Minghao Li 已提交
733 734
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
735
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
736
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
737 738
    goto _error;
  }
M
Minghao Li 已提交
739

M
Minghao Li 已提交
740
  // init internal
M
Minghao Li 已提交
741
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
742
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
743
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
744
    goto _error;
745
  }
M
Minghao Li 已提交
746

M
Minghao Li 已提交
747
  // init peersNum, peers, peersId
M
Minghao Li 已提交
748
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
749 750
  int32_t j = 0;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
751 752
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
753 754 755
      j++;
    }
  }
S
Shengliang Guan 已提交
756
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
757
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
758
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
759
      goto _error;
760
    }
M
Minghao Li 已提交
761
  }
M
Minghao Li 已提交
762

M
Minghao Li 已提交
763
  // init replicaNum, replicasId
M
Minghao Li 已提交
764
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
765
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
766
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
767
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
768
      goto _error;
769
    }
M
Minghao Li 已提交
770 771
  }

M
Minghao Li 已提交
772
  // init raft algorithm
M
Minghao Li 已提交
773
  pSyncNode->pFsm = pSyncInfo->pFsm;
774
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
775
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
776 777
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
778
  // init life cycle outside
M
Minghao Li 已提交
779

M
Minghao Li 已提交
780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
804
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
805
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
806
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
807
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
808
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
809 810
    goto _error;
  }
M
Minghao Li 已提交
811

M
Minghao Li 已提交
812
  // init TLA+ candidate vars
M
Minghao Li 已提交
813
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
814
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
815
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
816 817
    goto _error;
  }
M
Minghao Li 已提交
818
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
819
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
820
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
821 822
    goto _error;
  }
M
Minghao Li 已提交
823

M
Minghao Li 已提交
824 825
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
826
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
827
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
828 829
    goto _error;
  }
M
Minghao Li 已提交
830
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
831
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
832
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
833 834
    goto _error;
  }
M
Minghao Li 已提交
835 836 837

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
838
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
839
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
840 841
    goto _error;
  }
842 843 844 845 846

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
847
    if (code != 0) {
S
Shengliang Guan 已提交
848
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
849
      goto _error;
850
    }
851 852
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
853
      sNTrace(pSyncNode, "reset commit index by snapshot");
854 855 856
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
857

M
Minghao Li 已提交
858 859 860 861 862
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
863
  // init ping timer
M
Minghao Li 已提交
864
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
865
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
866 867
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
868
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
869
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
870

M
Minghao Li 已提交
871 872
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
873
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
874
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
875
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
876 877 878 879
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
880
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
881 882
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
883
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
884 885
  pSyncNode->heartbeatTimerCounter = 0;

886 887 888 889 890
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
891
  // tools
M
Minghao Li 已提交
892
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
893
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
894
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
895 896
    goto _error;
  }
M
Minghao Li 已提交
897

898 899
  // restore state
  pSyncNode->restoreFinish = false;
900

M
Minghao Li 已提交
901
  // snapshot senders
S
Shengliang Guan 已提交
902
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
903 904 905 906 907 908
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
  }

  // snapshot receivers
909
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
910

M
Minghao Li 已提交
911 912 913
  // is config changing
  pSyncNode->changing = false;

M
Minghao Li 已提交
914 915 916
  // peer state
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
917 918 919
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
920
  // start in syncNodeStart
M
Minghao Li 已提交
921
  // start raft
M
Minghao Li 已提交
922
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
923

M
Minghao Li 已提交
924 925
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
926
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
927 928
  pSyncNode->lastReplicateTime = timeNow;

929 930 931
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

S
Shengliang Guan 已提交
932
  sNTrace(pSyncNode, "sync open");
933

M
Minghao Li 已提交
934
  return pSyncNode;
935 936 937

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
938 939
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
940 941 942 943
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
944 945
}

M
Minghao Li 已提交
946 947 948 949 950 951 952 953 954 955 956
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
    ASSERT(code == 0);
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

M
Minghao Li 已提交
957 958
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
959
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
960
    raftStoreNextTerm(pSyncNode->pRaftStore);
961
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
962

963
    // Raft 3.6.2 Committing entries from previous terms
964 965
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
966

M
Minghao Li 已提交
967 968
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
969 970
  }

971 972 973
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
974 975
}

M
Minghao Li 已提交
976 977 978 979 980 981 982 983 984
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
985

986 987 988
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
989 990
}

M
Minghao Li 已提交
991 992 993 994 995 996 997 998
void syncNodePreClose(SSyncNode* pSyncNode) {
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

M
Minghao Li 已提交
999
void syncNodeClose(SSyncNode* pSyncNode) {
1000 1001 1002
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
1003 1004
  int32_t ret;

S
Shengliang Guan 已提交
1005
  sNTrace(pSyncNode, "sync close");
M
Minghao Li 已提交
1006

M
Minghao Li 已提交
1007
  ret = raftStoreClose(pSyncNode->pRaftStore);
M
Minghao Li 已提交
1008
  ASSERT(ret == 0);
M
Minghao Li 已提交
1009
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
1010

M
Minghao Li 已提交
1011
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1012
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1013
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1014
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1015
  votesRespondDestory(pSyncNode->pVotesRespond);
1016
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1017
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1018
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1019
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1020
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1021
  logStoreDestory(pSyncNode->pLogStore);
1022
  pSyncNode->pLogStore = NULL;
M
Minghao Li 已提交
1023
  raftCfgClose(pSyncNode->pRaftCfg);
1024
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1025 1026 1027 1028 1029

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1030 1031 1032 1033
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

S
Shengliang Guan 已提交
1034
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1035 1036 1037 1038 1039 1040
    if ((pSyncNode->senders)[i] != NULL) {
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1041 1042 1043 1044 1045
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1046
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1047 1048
}

M
Minghao Li 已提交
1049
// option
M
Minghao Li 已提交
1050 1051
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }

M
Minghao Li 已提交
1052
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1053

M
Minghao Li 已提交
1054 1055 1056
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1057 1058
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1059 1060 1061
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1062
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1063
  }
M
Minghao Li 已提交
1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1077
  if (syncIsInit()) {
1078
    pSyncNode->electTimerMS = ms;
M
Minghao Li 已提交
1079 1080 1081 1082 1083 1084

    SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
    pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
    pElectTimer->pSyncNode = pSyncNode;
    pElectTimer->pData = NULL;

S
Shengliang Guan 已提交
1085
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager,
1086
                 &pSyncNode->pElectTimer);
1087

1088
  } else {
M
Minghao Li 已提交
1089
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1090
  }
M
Minghao Li 已提交
1091 1092 1093 1094 1095
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1096
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1097 1098
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1099

M
Minghao Li 已提交
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1110 1111
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1112 1113 1114 1115 1116 1117 1118
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1119
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1120

S
Shengliang Guan 已提交
1121 1122
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1123 1124 1125
  return ret;
}

M
Minghao Li 已提交
1126
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1127
  int32_t ret = 0;
S
Shengliang Guan 已提交
1128 1129
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1130 1131 1132
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1133
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1134
  }
1135

S
Shengliang Guan 已提交
1136
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1137 1138 1139
  return ret;
}

M
Minghao Li 已提交
1140
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1141
  int32_t ret = 0;
M
Minghao Li 已提交
1142

1143
#if 0
M
Minghao Li 已提交
1144
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1145 1146
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1147

S
Shengliang Guan 已提交
1148
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1149
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1150 1151 1152
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1153
  }
1154

M
Minghao Li 已提交
1155 1156 1157
  return ret;
}

M
Minghao Li 已提交
1158 1159
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1160 1161

#if 0
M
Minghao Li 已提交
1162 1163 1164
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1165
#endif
1166

S
Shengliang Guan 已提交
1167
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1168
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1169 1170 1171
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1172
  }
1173

M
Minghao Li 已提交
1174 1175 1176
  return ret;
}

1177 1178 1179 1180 1181 1182
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1183 1184 1185
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1186
  syncUtilRaftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1187
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1188 1189 1190
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1191
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1192
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1193
  } else {
M
Minghao Li 已提交
1194 1195
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
    return -1;
M
Minghao Li 已提交
1196
  }
M
Minghao Li 已提交
1197

M
Minghao Li 已提交
1198 1199 1200 1201 1202
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1203
  syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1204
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1205 1206 1207
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1208
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1209
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1210
  } else {
M
Minghao Li 已提交
1211
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1212
  }
M
Minghao Li 已提交
1213 1214 1215
  return 0;
}

1216
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1217 1218 1219
  bool b1 = false;
  bool b2 = false;

S
Shengliang Guan 已提交
1220
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1221 1222 1223 1224 1225 1226 1227
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

S
Shengliang Guan 已提交
1228
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

  ASSERT(b1 == b2);
  return b1;
}

1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1256
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1257
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1258 1259 1260 1261
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1262

1263
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1264 1265
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

M
Minghao Li 已提交
1266 1267
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1268

M
Minghao Li 已提交
1269 1270
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1271

M
Minghao Li 已提交
1272 1273 1274 1275
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1276
  }
1277

M
Minghao Li 已提交
1278 1279 1280 1281 1282
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1283

M
Minghao Li 已提交
1284
  // log begin config change
S
Shengliang Guan 已提交
1285 1286 1287 1288 1289
  char oldCfgStr[1024] = {0};
  char newCfgStr[1024] = {0};
  syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
  syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
  sNTrace(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
M
Minghao Li 已提交
1290

M
Minghao Li 已提交
1291 1292
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1293
  }
M
Minghao Li 已提交
1294 1295
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1296 1297
  }

M
Minghao Li 已提交
1298
  // add last config index
M
Minghao Li 已提交
1299
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1300

M
Minghao Li 已提交
1301 1302 1303 1304 1305 1306 1307 1308 1309
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1310
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1311
      oldSenders[i] = (pSyncNode->senders)[i];
S
Shengliang Guan 已提交
1312
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1313
    }
1314

M
Minghao Li 已提交
1315 1316
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1317
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1318 1319 1320

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1321 1322
    int32_t j = 0;
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
1323 1324 1325 1326 1327
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
S
Shengliang Guan 已提交
1328
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1329
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1330
    }
1331

M
Minghao Li 已提交
1332 1333
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
1334
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
1335
      syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1336
    }
1337

1338 1339 1340
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1341 1342 1343 1344
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1345

M
Minghao Li 已提交
1346
    // reset snapshot senders
1347

M
Minghao Li 已提交
1348
    // clear new
S
Shengliang Guan 已提交
1349
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1350 1351
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
1352

M
Minghao Li 已提交
1353
    // reset new
S
Shengliang Guan 已提交
1354
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1355 1356
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1357
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1358 1359 1360 1361
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1362
          sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
S
Shengliang Guan 已提交
1363
                  (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
M
Minghao Li 已提交
1364 1365 1366 1367 1368 1369 1370 1371 1372

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

S
Shengliang Guan 已提交
1373 1374
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
                  i, host, port, (pSyncNode->senders)[i], reset);
M
Minghao Li 已提交
1375
        }
1376 1377
      }
    }
1378

M
Minghao Li 已提交
1379
    // create new
S
Shengliang Guan 已提交
1380
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1381 1382
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
S
Shengliang Guan 已提交
1383
        sSTrace((pSyncNode->senders)[i], "snapshot sender create new");
M
Minghao Li 已提交
1384
      }
1385 1386
    }

M
Minghao Li 已提交
1387
    // free old
S
Shengliang Guan 已提交
1388
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1389 1390
      if (oldSenders[i] != NULL) {
        snapshotSenderDestroy(oldSenders[i]);
S
Shengliang Guan 已提交
1391
        sNTrace(pSyncNode, "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1392 1393
        oldSenders[i] = NULL;
      }
1394 1395
    }

1396
    // persist cfg
M
Minghao Li 已提交
1397
    raftCfgPersist(pSyncNode->pRaftCfg);
1398

S
Shengliang Guan 已提交
1399
    char tmpbuf[1024] = {0};
1400
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
S
Shengliang Guan 已提交
1401
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1402

M
Minghao Li 已提交
1403 1404 1405
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
1406 1407 1408 1409 1410

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
      syncMaybeAdvanceCommitIndex(pSyncNode);

M
Minghao Li 已提交
1411 1412 1413 1414
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
1415
    // persist cfg
M
Minghao Li 已提交
1416
    raftCfgPersist(pSyncNode->pRaftCfg);
S
Shengliang Guan 已提交
1417 1418
    sNTrace(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s", oldConfig.replicaNum,
            pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
1419
  }
1420

M
Minghao Li 已提交
1421
_END:
M
Minghao Li 已提交
1422
  // log end config change
S
Shengliang Guan 已提交
1423
  sNTrace(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1424 1425
}

M
Minghao Li 已提交
1426 1427 1428 1429
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1430
    char tmpBuf[64];
1431
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1432
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1433 1434 1435 1436
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1437 1438 1439 1440 1441 1442
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1443
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1444
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1445
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1446
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1447 1448
    return;
  }
M
Minghao Li 已提交
1449 1450

  do {
1451
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1452
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1453 1454 1455 1456 1457
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1458
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1469 1470
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1471
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1472
  // maybe clear leader cache
M
Minghao Li 已提交
1473 1474 1475 1476
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
1477
  // state change
M
Minghao Li 已提交
1478 1479 1480
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1481 1482
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1483

1484 1485 1486
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1487 1488 1489 1490 1491
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1492 1493 1494
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1495
  // trace log
S
Shengliang Guan 已提交
1496
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1517
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1518 1519
  pSyncNode->leaderTime = taosGetTimestampMs();

1520 1521 1522
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1523
  // state change
M
Minghao Li 已提交
1524
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1525 1526

  // set leader cache
M
Minghao Li 已提交
1527 1528
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1529
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1530 1531
    // maybe overwrite myself, no harm
    // just do it!
1532 1533 1534 1535 1536 1537 1538 1539 1540

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1541 1542
  }

S
Shengliang Guan 已提交
1543
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1544 1545
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1546 1547 1548
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1549 1550 1551
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1552
#if 0
1553 1554
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1555
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1556
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1557 1558 1559
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1560
    }
1561
    (pMySender->privateTerm) += 100;
1562
  }
M
Minghao Li 已提交
1563
#endif
1564

1565 1566 1567 1568 1569
  // close receiver
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1570
  // stop elect timer
M
Minghao Li 已提交
1571
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1572

M
Minghao Li 已提交
1573 1574
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1575

M
Minghao Li 已提交
1576 1577
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1578

1579 1580 1581 1582 1583
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1584 1585 1586
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1587
  // trace log
S
Shengliang Guan 已提交
1588
  sNTrace(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1589 1590 1591
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1592 1593
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1594
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1595

S
Shengliang Guan 已提交
1596
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1597

M
Minghao Li 已提交
1598
  // Raft 3.6.2 Committing entries from previous terms
1599 1600
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1601 1602

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1603
    syncNodeReplicate(pSyncNode);
1604
  }
M
Minghao Li 已提交
1605 1606
}

M
Minghao Li 已提交
1607 1608
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1609
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1610
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1611 1612 1613 1614 1615
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1616 1617 1618
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1619
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1620
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
S
Shengliang Guan 已提交
1621
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1622 1623 1624
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1625
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1626
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
S
Shengliang Guan 已提交
1627
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1628 1629 1630
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1631
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1632
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
S
Shengliang Guan 已提交
1633
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1634 1635 1636
}

// raft vote --------------
M
Minghao Li 已提交
1637 1638 1639

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1640
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
M
Minghao Li 已提交
1641 1642
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1643 1644 1645 1646

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1647
// simulate get vote from outside
M
Minghao Li 已提交
1648
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1649
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1650

S
Shengliang Guan 已提交
1651 1652
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1653
  if (ret != 0) return;
S
Shengliang Guan 已提交
1654 1655

  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1656 1657 1658 1659 1660 1661 1662
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1663
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1664 1665
}

M
Minghao Li 已提交
1666
// return if has a snapshot
M
Minghao Li 已提交
1667 1668
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1669
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1670 1671
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1672 1673 1674 1675 1676 1677 1678
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1679 1680
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1681
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1682
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1683 1684
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1685 1686 1687 1688 1689 1690 1691
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1692 1693
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1694 1695
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1696 1697
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1698
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1699 1700
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1701 1702
    }

M
Minghao Li 已提交
1703 1704 1705
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1706 1707 1708 1709
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1710
  } else {
M
Minghao Li 已提交
1711 1712
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1713
  }
M
Minghao Li 已提交
1714

M
Minghao Li 已提交
1715 1716 1717 1718 1719 1720 1721
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1722 1723
  return 0;
}
M
Minghao Li 已提交
1724

M
Minghao Li 已提交
1725
// return append-entries first try index
M
Minghao Li 已提交
1726 1727 1728 1729 1730
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1731 1732
// if index > 0, return index - 1
// else, return -1
1733 1734 1735 1736 1737 1738 1739 1740 1741
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1742 1743 1744 1745
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

  SyncTerm        preTerm = 0;
  SyncIndex       preIndex = index - 1;
  SSyncRaftEntry* pPreEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
M
Minghao Li 已提交
1759 1760 1761 1762 1763 1764

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

1765 1766 1767 1768 1769 1770
  if (code == 0) {
    ASSERT(pPreEntry != NULL);
    preTerm = pPreEntry->term;
    taosMemoryFree(pPreEntry);
    return preTerm;
  } else {
1771 1772 1773 1774
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
1775 1776 1777 1778
      }
    }
  }

1779
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
1780
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1781 1782
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
1783 1784 1785 1786

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1787
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1788 1789 1790
  return 0;
}

M
Minghao Li 已提交
1791
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1792 1793 1794 1795 1796
  if (!syncIsInit()) return;

  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1797
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
1798 1799 1800 1801 1802
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
      sNError(pNode, "failed to build ping msg");
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
1803
    }
M
Minghao Li 已提交
1804

S
Shengliang Guan 已提交
1805 1806 1807 1808 1809 1810
    sNTrace(pNode, "enqueue ping msg");
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
      sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr());
      rpcFreeCont(rpcMsg.pCont);
      return;
1811 1812
    }

S
Shengliang Guan 已提交
1813
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
M
Minghao Li 已提交
1814
  } else {
1815
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64,
S
Shengliang Guan 已提交
1816
           pNode->pingTimerLogicClock, pNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1817 1818 1819 1820
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1821 1822 1823 1824 1825 1826
  if (!syncIsInit()) return;

  SElectTimer* pElectTimer = param;
  SSyncNode*   pNode = pElectTimer->pSyncNode;

  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1827
  int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843

  if (code != 0) {
    sNError(pNode, "failed to build elect msg");
    taosMemoryFree(pElectTimer);
    return;
  }

  SyncTimeout* pTimeout = rpcMsg.pCont;
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
    sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr());
    rpcFreeCont(rpcMsg.pCont);
    taosMemoryFree(pElectTimer);
    return;
M
Minghao Li 已提交
1844
  }
M
Minghao Li 已提交
1845 1846

  taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
1847

M
Minghao Li 已提交
1848
#if 0
M
Minghao Li 已提交
1849
  // reset timer ms
S
Shengliang Guan 已提交
1850 1851 1852
  if (syncIsInit() && pNode->electBaseLine > 0) {
    pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine);
    taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer);
M
Minghao Li 已提交
1853 1854
  } else {
    sError("sync env is stop, syncNodeEqElectTimer");
M
Minghao Li 已提交
1855
  }
M
Minghao Li 已提交
1856
#endif
M
Minghao Li 已提交
1857 1858
}

M
Minghao Li 已提交
1859
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1860
  if (!syncIsInit()) return;
1861

S
Shengliang Guan 已提交
1862 1863 1864 1865
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1866
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
1867 1868 1869 1870 1871
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
        sNError(pNode, "failed to build heartbeat msg");
        return;
1872
      }
M
Minghao Li 已提交
1873

S
Shengliang Guan 已提交
1874 1875 1876 1877 1878 1879
      sNTrace(pNode, "enqueue heartbeat timer");
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
        sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr());
        rpcFreeCont(rpcMsg.pCont);
        return;
1880
      }
S
Shengliang Guan 已提交
1881 1882 1883 1884

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

1885
    } else {
S
Shengliang Guan 已提交
1886 1887
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
1888
    }
M
Minghao Li 已提交
1889 1890 1891
  }
}

1892 1893 1894 1895 1896
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
  SSyncHbTimerData* pData = (SSyncHbTimerData*)param;
  SSyncNode*        pSyncNode = pData->pSyncNode;
  SSyncTimer*       pSyncTimer = pData->pTimer;

M
Minghao Li 已提交
1897 1898 1899 1900
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1901 1902 1903 1904
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return;
  }

M
Minghao Li 已提交
1905 1906 1907 1908
  if (pSyncNode->pRaftStore == NULL) {
    return;
  }

S
Shengliang Guan 已提交
1909
  // sNTrace(pSyncNode, "eq peer hb timer");
1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920

  int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
  int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

  if (pSyncNode->replicaNum > 1) {
    if (timerLogicClock == msgLogicClock) {
      SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
      pSyncMsg->srcId = pSyncNode->myRaftId;
      pSyncMsg->destId = pData->destId;
      pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
      pSyncMsg->commitIndex = pSyncNode->commitIndex;
M
Minghao Li 已提交
1921
      pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
1922 1923 1924 1925 1926 1927 1928
      pSyncMsg->privateTerm = 0;

      SRpcMsg rpcMsg;
      syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);

// eq msg
#if 0
S
Shengliang Guan 已提交
1929 1930
      if (pSyncNode->syncEqCtrlMsg != NULL) {
        int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
1931 1932 1933 1934 1935 1936 1937
        if (code != 0) {
          sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
          rpcFreeCont(rpcMsg.pCont);
          syncHeartbeatDestroy(pSyncMsg);
          return;
        }
      } else {
S
Shengliang Guan 已提交
1938
        sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
1939 1940 1941 1942
      }
#endif

      // send msg
M
Minghao Li 已提交
1943
      syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
1944 1945 1946

      syncHeartbeatDestroy(pSyncMsg);

S
Shengliang Guan 已提交
1947 1948
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
1949 1950 1951 1952 1953 1954
                     &pSyncTimer->pTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }

    } else {
1955
      sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", timerLogicClock,
1956 1957 1958 1959 1960
             msgLogicClock);
    }
  }
}

1961 1962 1963 1964 1965
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
1966

1967 1968 1969 1970
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
1971

S
Shengliang Guan 已提交
1972
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1973
  int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
M
Minghao Li 已提交
1974
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1975

1976 1977 1978 1979 1980 1981 1982
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
    sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr());
  }

  return code;
M
Minghao Li 已提交
1983 1984
}

1985 1986 1987
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
S
Shengliang Guan 已提交
1988 1989
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
1990 1991 1992 1993 1994 1995 1996 1997 1998
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

M
Minghao Li 已提交
1999 2000 2001
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

2002
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2003
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2004
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2005
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2006

2007 2008 2009
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2010
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2011
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2012
    if (code != 0) {
S
Shengliang Guan 已提交
2013
      sNError(ths, "append noop error");
2014 2015
      return -1;
    }
M
Minghao Li 已提交
2016 2017
  }

2018 2019 2020 2021 2022 2023
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2024 2025 2026
  return ret;
}

2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
  syncLogRecvHeartbeat(ths, pMsg, "");

  SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number

  SRpcMsg rpcMsg;
  syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);

M
Minghao Li 已提交
2039
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2040
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2041
    ths->minMatchIndex = pMsg->minMatchIndex;
2042 2043

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
      SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;

      SRpcMsg rpcMsgLocalCmd;
      syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2058
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, pSyncMsg->fcIndex);
2059 2060
        }
      }
2061 2062 2063
    }
  }

M
Minghao Li 已提交
2064
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2065 2066 2067 2068 2069 2070 2071 2072
    // syncNodeStepDown(ths, pMsg->term);
    SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

    SRpcMsg rpcMsgLocalCmd;
    syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

S
Shengliang Guan 已提交
2073 2074
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2075 2076 2077 2078
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2079
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2080 2081 2082 2083
      }
    }

    syncLocalCmdDestroy(pSyncMsg);
M
Minghao Li 已提交
2084 2085
  }

2086 2087 2088 2089 2090 2091 2092 2093 2094
  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
M
Minghao Li 已提交
2095
  syncHeartbeatReplyDestroy(pMsgReply);
2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108

  return 0;
}

int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
  syncLogRecvHeartbeatReply(ths, pMsg, "");

  // update last reply time, make decision whether the other node is alive or not
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime);

  return 0;
}

M
Minghao Li 已提交
2109
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
2110 2111
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2112 2113 2114
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2115 2116 2117
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2118
  } else {
S
Shengliang Guan 已提交
2119
    sNError(ths, "error local cmd");
M
Minghao Li 已提交
2120 2121 2122 2123 2124
  }

  return 0;
}

M
Minghao Li 已提交
2125 2126 2127 2128 2129 2130 2131 2132 2133 2134
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2135

2136
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2137
  sNTrace(ths, "on client request");
2138

M
Minghao Li 已提交
2139
  int32_t ret = 0;
2140
  int32_t code = 0;
M
Minghao Li 已提交
2141

M
Minghao Li 已提交
2142
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2143
  SyncTerm        term = ths->pRaftStore->currentTerm;
2144 2145 2146 2147 2148 2149 2150
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2151

2152 2153 2154
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2155
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2156 2157 2158
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2159 2160 2161 2162 2163 2164
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2165

2166 2167 2168 2169
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2181
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2182 2183 2184 2185 2186 2187 2188

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }

2189 2190
        return -1;
      }
2191
    }
M
Minghao Li 已提交
2192

2193 2194
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2195
      syncNodeReplicate(ths);
2196
    }
2197

2198 2199
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2200 2201 2202 2203 2204
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2205
    }
M
Minghao Li 已提交
2206 2207
  }

2208 2209 2210 2211 2212 2213 2214 2215
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2216 2217 2218 2219 2220 2221
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2222
  return ret;
2223
}
M
Minghao Li 已提交
2224

S
Shengliang Guan 已提交
2225 2226 2227
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2228
      return "follower";
S
Shengliang Guan 已提交
2229
    case TAOS_SYNC_STATE_CANDIDATE:
2230
      return "candidate";
S
Shengliang Guan 已提交
2231
    case TAOS_SYNC_STATE_LEADER:
2232
      return "leader";
S
Shengliang Guan 已提交
2233
    default:
2234
      return "error";
S
Shengliang Guan 已提交
2235
  }
M
Minghao Li 已提交
2236
}
2237

2238
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2239
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2240
    sNTrace(ths, "I am not follower, can not do leader transfer");
2241 2242
    return 0;
  }
2243 2244

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2245
    sNTrace(ths, "restore not finish, can not do leader transfer");
2246 2247 2248
    return 0;
  }

2249
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2250
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2251 2252 2253 2254
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2255
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2256 2257 2258
    return 0;
  }

2259 2260
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2261
      sNTrace(ths, "I am vnode, can not do leader transfer");
2262 2263 2264 2265
      return 0;
    }
  */

M
Minghao Li 已提交
2266
  SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
S
Shengliang Guan 已提交
2267
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2268

M
Minghao Li 已提交
2269 2270 2271
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2272

M
Minghao Li 已提交
2273 2274
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2275 2276 2277 2278
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
M
Minghao Li 已提交
2279

2280
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2281
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2282 2283
  }

M
Minghao Li 已提交
2284
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2285
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2286 2287 2288 2289 2290 2291 2292 2293 2294
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2295 2296
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2297 2298
  }

M
Minghao Li 已提交
2299
  syncLeaderTransferDestroy(pSyncLeaderTransfer);
2300 2301 2302
  return 0;
}

2303
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2304
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2318 2319 2320 2321
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2322
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2323 2324 2325 2326
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2327 2328 2329 2330 2331 2332 2333 2334 2335
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2336
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2337

M
Minghao Li 已提交
2338 2339 2340
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2341 2342
  }

2343 2344
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2345

S
Shengliang Guan 已提交
2346
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2347 2348 2349 2350 2351 2352

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2353 2354 2355 2356 2357 2358
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        } else {
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
M
Minghao Li 已提交
2359 2360 2361
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2362
            sNError(ths, "get log entry error");
2363
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2364 2365
            continue;
          }
2366
        }
2367

2368
        SRpcMsg rpcMsg = {0};
2369 2370
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2371
        // user commit
2372 2373
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2374
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2375 2376 2377
            internalExecute = false;
          }

S
Shengliang Guan 已提交
2378
          sNTrace(ths, "commit index:%" PRId64 ", internal:%d", i, internalExecute);
2379

2380 2381
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2394
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2395
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
2396
          }
2397 2398
        }

2399 2400
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2401
        // leader transfer
2402 2403 2404
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
2405
        }
2406
#endif
2407 2408

        // restore finish
2409
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
2410 2411 2412 2413 2414 2415
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
2416

2417
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
2418
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
2419 2420 2421 2422
          }
        }

        rpcFreeCont(rpcMsg.pCont);
2423 2424 2425 2426 2427
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2428 2429 2430 2431
      }
    }
  }
  return 0;
2432 2433 2434
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2435
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2436 2437 2438 2439 2440
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2441 2442 2443 2444
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2445
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2446 2447 2448 2449 2450
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2451
}
M
Minghao Li 已提交
2452

2453 2454
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2455
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2456 2457 2458 2459 2460 2461 2462
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2463 2464
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2465
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2466 2467 2468 2469 2470 2471 2472 2473 2474
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2475
  if (pState == NULL) {
2476
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2477 2478
    return false;
  }
M
Minghao Li 已提交
2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2504
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2505
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2506
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2507 2508 2509 2510 2511 2512
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2513 2514
}

2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
  if (timerType == SYNC_TIMEOUT_PING) {
    return "ping";
  } else if (timerType == SYNC_TIMEOUT_ELECTION) {
    return "elect";
  } else if (timerType == SYNC_TIMEOUT_HEARTBEAT) {
    return "heartbeat";
  } else {
    return "unknown";
  }
}

void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
2528
  sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
S
Shengliang Guan 已提交
2529
          syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
2530 2531
}

2532
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
2533 2534 2535
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2536 2537

  sNTrace(pSyncNode,
2538
          "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
S
Shengliang Guan 已提交
2539 2540
          "}, %s",
          host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
2541 2542
}

wafwerar's avatar
wafwerar 已提交
2543
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
2544 2545 2546
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2547 2548

  sNTrace(pSyncNode,
2549
          "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
S
Shengliang Guan 已提交
2550 2551
          "}, %s",
          host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
2552
}
2553 2554 2555 2556 2557

void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2558 2559

  sNTrace(pSyncNode,
2560
          "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
S
Shengliang Guan 已提交
2561 2562
          "}, %s",
          host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
2563 2564 2565 2566 2567 2568
}

void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2569 2570

  sNTrace(pSyncNode,
2571
          "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
S
Shengliang Guan 已提交
2572 2573
          "}, %s",
          host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
2574 2575 2576 2577 2578 2579
}

void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
2580

2581
  sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2582
          pMsg->term, pMsg->privateTerm, s);
2583 2584 2585 2586 2587 2588
}

void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2589
  sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2590
          pMsg->term, pMsg->privateTerm, s);
M
Minghao Li 已提交
2591
}
2592 2593

void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
2594 2595
  sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
          syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
M
Minghao Li 已提交
2596 2597 2598 2599 2600 2601
}

void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
2602
  sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
M
Minghao Li 已提交
2603 2604 2605 2606 2607 2608
}

void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2609
  sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
M
Minghao Li 已提交
2610 2611 2612 2613 2614 2615
}

void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
2616
  sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port,
S
Shengliang Guan 已提交
2617
          pMsg->term, pMsg->snapStart, s);
M
Minghao Li 已提交
2618 2619 2620 2621 2622 2623
}

void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
2624
  sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host,
S
Shengliang Guan 已提交
2625
          port, pMsg->term, pMsg->snapStart, s);
M
Minghao Li 已提交
2626
}
M
Minghao Li 已提交
2627 2628 2629 2630 2631 2632 2633 2634

void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}

void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}

void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}

void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}