syncMain.c 123.6 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
M
Minghao Li 已提交
26
#include "syncRaftCfg.h"
M
Minghao Li 已提交
27
#include "syncRaftLog.h"
M
Minghao Li 已提交
28
#include "syncRaftStore.h"
M
Minghao Li 已提交
29
#include "syncReplication.h"
M
Minghao Li 已提交
30 31
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
32
#include "syncRespMgr.h"
M
Minghao Li 已提交
33
#include "syncSnapshot.h"
M
Minghao Li 已提交
34
#include "syncTimeout.h"
M
Minghao Li 已提交
35
#include "syncUtil.h"
M
Minghao Li 已提交
36
#include "syncVoteMgr.h"
M
Minghao Li 已提交
37

M
Minghao Li 已提交
38 39 40 41 42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
43
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
44
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
M
Minghao Li 已提交
45

46
int64_t syncOpen(SSyncInfo* pSyncInfo) {
S
Shengliang Guan 已提交
47 48
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
49
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
50 51
    return -1;
  }
M
Minghao Li 已提交
52

S
Shengliang Guan 已提交
53 54 55
  pSyncNode->rid = syncNodeAdd(pSyncNode);
  if (pSyncNode->rid < 0) {
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
56 57 58
    return -1;
  }

S
Shengliang Guan 已提交
59 60 61 62 63 64
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
65
  return pSyncNode->rid;
M
Minghao Li 已提交
66
}
M
Minghao Li 已提交
67

M
Minghao Li 已提交
68
void syncStart(int64_t rid) {
S
Shengliang Guan 已提交
69 70 71 72
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodeStart(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
73 74 75
  }
}

M
Minghao Li 已提交
76
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
77
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
78
  if (pSyncNode != NULL) {
S
Shengliang Guan 已提交
79
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
80
    syncNodeRemove(rid);
M
Minghao Li 已提交
81
  }
S
Shengliang Guan 已提交
82
}
M
Minghao Li 已提交
83

M
Minghao Li 已提交
84 85 86 87 88 89 90 91 92
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) return;

  syncNodePreClose(pSyncNode);

  syncNodeRelease(pSyncNode);
}

S
Shengliang Guan 已提交
93 94 95
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
96 97
}

S
Shengliang Guan 已提交
98
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
99
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
100
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
101

M
Minghao Li 已提交
102
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
103
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
104
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
105
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
106
    return -1;
M
Minghao Li 已提交
107
  }
108

S
Shengliang Guan 已提交
109 110
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
111

M
Minghao Li 已提交
112 113 114 115
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
116
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
117 118 119 120 121
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
    syncNodeReplicate(pSyncNode);
  }
S
Shengliang Guan 已提交
122

S
Shengliang Guan 已提交
123
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
124
  return 0;
M
Minghao Li 已提交
125
}
M
Minghao Li 已提交
126

S
Shengliang Guan 已提交
127 128 129 130
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
131
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
  if (pSyncNode == NULL) return code;

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
    SyncHeartbeat* pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
    syncHeartbeatDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
    SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
    syncHeartbeatReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
    SyncTimeout* pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
    code = syncNodeOnTimer(pSyncNode, pSyncMsg);
    syncTimeoutDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_PING) {
    SyncPing* pSyncMsg = syncPingFromRpcMsg2(pMsg);
    code = syncNodeOnPing(pSyncNode, pSyncMsg);
    syncPingDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
    SyncPingReply* pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
    code = syncNodeOnPingReply(pSyncNode, pSyncMsg);
    syncPingReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    SyncClientRequest* pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
    code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
    syncClientRequestDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
    SyncRequestVote* pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
    syncRequestVoteDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
    SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
    syncRequestVoteReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
    SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
    syncAppendEntriesDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
    SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
    syncSnapshotSendDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
    SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
    syncSnapshotRspDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
    SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
    code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
    syncLocalCmdDestroy(pSyncMsg);
  } else {
    sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
    code = -1;
M
Minghao Li 已提交
189 190
  }

S
Shengliang Guan 已提交
191
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
192
  return code;
193 194
}

S
Shengliang Guan 已提交
195
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
196
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
197
  if (pSyncNode == NULL) return -1;
198

S
Shengliang Guan 已提交
199
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
200
  syncNodeRelease(pSyncNode);
201 202 203
  return ret;
}

M
Minghao Li 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

220 221 222 223 224 225 226 227 228 229 230 231 232 233
char* syncNodePeerState2Str(const SSyncNode* pSyncNode) {
  int32_t len = 128;
  int32_t useLen = 0;
  int32_t leftLen = len - useLen;
  char*   pStr = taosMemoryMalloc(len);
  memset(pStr, 0, len);

  char*   p = pStr;
  int32_t use = snprintf(p, leftLen, "{");
  useLen += use;
  leftLen -= use;

  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    SPeerState* pState = syncNodeGetPeerState((SSyncNode*)pSyncNode, &(pSyncNode->replicasId[i]));
M
Minghao Li 已提交
234
    if (pState == NULL) {
235
      sError("vgId:%d, replica maybe dropped", pSyncNode->vgId);
M
Minghao Li 已提交
236 237
      break;
    }
238 239

    p = pStr + useLen;
S
Shengliang Guan 已提交
240
    use = snprintf(p, leftLen, "%d:%" PRId64 " ,%" PRId64, i, pState->lastSendIndex, pState->lastSendTime);
241 242 243 244 245 246 247 248 249 250 251 252 253 254
    useLen += use;
    leftLen -= use;
  }

  p = pStr + useLen;
  use = snprintf(p, leftLen, "}");
  useLen += use;
  leftLen -= use;

  // sTrace("vgId:%d, ------------------ syncNodePeerState2Str:%s", pSyncNode->vgId, pStr);

  return pStr;
}

255
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
256
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
257 258 259 260 261 262 263
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);
  int32_t code = 0;

M
Minghao Li 已提交
264
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
265 266 267
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
268 269 270
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
271 272 273
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
M
Minghao Li 已提交
274
      char logBuf[256];
S
Shengliang Guan 已提交
275 276 277
      snprintf(logBuf, sizeof(logBuf),
               "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex,
               logNum, isEmpty);
M
Minghao Li 已提交
278 279
      syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
280
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
281 282 283
      return 0;
    }

M
Minghao Li 已提交
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
    goto _DEL_WAL;

  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
              char logBuf[256];
              snprintf(logBuf, sizeof(logBuf),
S
Shengliang Guan 已提交
303 304
                       "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                       " of %s:%d, do not delete wal",
M
Minghao Li 已提交
305 306 307 308
                       lastApplyIndex, matchIndex, host, port);
              syncNodeEventLog(pSyncNode, logBuf);
            } while (0);

S
Shengliang Guan 已提交
309
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
310 311 312 313 314 315 316 317
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
          char logBuf[256];
          snprintf(logBuf, sizeof(logBuf),
S
Shengliang Guan 已提交
318 319
                   "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                   lastApplyIndex, pSyncNode->minMatchIndex);
M
Minghao Li 已提交
320 321
          syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
322
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
323 324 325 326
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
327
        char logBuf[256];
S
Shengliang Guan 已提交
328
        snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
329 330
        syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
331
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
332 333 334 335
        return 0;

      } else {
        char logBuf[256];
S
Shengliang Guan 已提交
336 337
        snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " unknown state, do not delete wal",
                 lastApplyIndex);
M
Minghao Li 已提交
338 339
        syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
340
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
341 342 343 344 345 346 347 348 349
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
350 351 352
    }
  }

M
Minghao Li 已提交
353
_DEL_WAL:
354

M
Minghao Li 已提交
355
  do {
356 357 358 359
    SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

    if (snapshottingIndex == SYNC_INDEX_INVALID) {
      atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
360
      pSyncNode->snapshottingTime = taosGetTimestampMs();
361

M
Minghao Li 已提交
362 363 364
      SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
      code = walBeginSnapshot(pData->pWal, lastApplyIndex);
      if (code == 0) {
365
        char logBuf[256];
S
Shengliang Guan 已提交
366
        snprintf(logBuf, sizeof(logBuf), "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
367 368 369
                 pSyncNode->snapshottingIndex, lastApplyIndex);
        syncNodeEventLog(pSyncNode, logBuf);

M
Minghao Li 已提交
370 371
      } else {
        char logBuf[256];
S
Shengliang Guan 已提交
372 373 374
        snprintf(logBuf, sizeof(logBuf),
                 "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64, terrstr(terrno),
                 pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
375 376 377 378
        syncNodeErrorLog(pSyncNode, logBuf);

        atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
      }
379 380

    } else {
381
      char logBuf[256];
S
Shengliang Guan 已提交
382 383 384
      snprintf(logBuf, sizeof(logBuf),
               "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64, snapshottingIndex,
               lastApplyIndex);
385
      syncNodeEventLog(pSyncNode, logBuf);
386
    }
M
Minghao Li 已提交
387
  } while (0);
388

S
Shengliang Guan 已提交
389
  syncNodeRelease(pSyncNode);
390 391 392 393
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
394
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
395 396 397 398 399 400
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

401 402 403 404
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
405
    if (code != 0) {
406
      sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr());
M
Minghao Li 已提交
407

S
Shengliang Guan 已提交
408
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
409 410 411 412
      return -1;
    } else {
      do {
        char logBuf[256];
S
Shengliang Guan 已提交
413 414
        snprintf(logBuf, sizeof(logBuf), "wal snapshot end, index:%" PRId64,
                 atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
415 416
        syncNodeEventLog(pSyncNode, logBuf);
      } while (0);
417

M
Minghao Li 已提交
418 419
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
420
  }
421

S
Shengliang Guan 已提交
422
  syncNodeRelease(pSyncNode);
423 424 425
  return code;
}

M
Minghao Li 已提交
426
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
427
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
428 429 430 431 432 433 434 435
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  syncNodeStepDown(pSyncNode, newTerm);

S
Shengliang Guan 已提交
436
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
437 438 439
  return 0;
}

M
Minghao Li 已提交
440 441
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
442
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

  SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
  int32_t   ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  return ret;
}

int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  int32_t ret = 0;

  if (pSyncNode->replicaNum == 1) {
456
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
457 458 459 460
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

M
Minghao Li 已提交
461 462 463 464 465 466
  do {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

M
Minghao Li 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479
  SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;
  ASSERT(pMsg != NULL);
  SRpcMsg rpcMsg = {0};
  syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
  syncLeaderTransferDestroy(pMsg);

  ret = syncNodePropose(pSyncNode, &rpcMsg, false);
  return ret;
}

480
bool syncCanLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
481
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
482 483 484
  if (pSyncNode == NULL) {
    return false;
  }
M
Minghao Li 已提交
485
  ASSERT(rid == pSyncNode->rid);
486 487

  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
488
    syncNodeRelease(pSyncNode);
489 490 491 492
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
493
    syncNodeRelease(pSyncNode);
494 495 496 497 498 499 500 501 502 503 504 505 506 507
    return true;
  }

  bool matchOK = true;
  if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE || pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SyncIndex myCommitIndex = pSyncNode->commitIndex;
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
      if (peerMatchIndex < myCommitIndex) {
        matchOK = false;
      }
    }
  }

S
Shengliang Guan 已提交
508
  syncNodeRelease(pSyncNode);
509 510 511
  return matchOK;
}

512
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
513 514 515
  int32_t ret = syncPropose(rid, pMsg, isWeak);
  return ret;
}
M
Minghao Li 已提交
516

517 518
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
519

S
Shengliang Guan 已提交
520
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
521 522 523 524
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
525 526
  }

527
  return state;
M
Minghao Li 已提交
528 529
}

530
#if 0
531 532 533 534 535
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
536
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
537 538 539 540 541 542 543 544 545 546 547
  if (pSyncNode == NULL) {
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
    }
S
Shengliang Guan 已提交
548
    syncNodeRelease(pSyncNode);
549 550 551 552 553 554 555 556 557 558
    return -1;
  }
  ASSERT(pEntry != NULL);

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

  syncEntryDestory(pEntry);
S
Shengliang Guan 已提交
559
  syncNodeRelease(pSyncNode);
560 561 562
  return 0;
}

563
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
564
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
565 566 567
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
568
  ASSERT(rid == pSyncNode->rid);
569 570
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
571
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
572

S
Shengliang Guan 已提交
573
  syncNodeRelease(pSyncNode);
574 575 576
  return 0;
}

577
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
578
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
579 580 581
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
582
  ASSERT(rid == pSyncNode->rid);
583 584 585 586 587 588 589 590 591 592 593

  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

  for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
594
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
595
         sMeta->lastConfigIndex);
596

S
Shengliang Guan 已提交
597
  syncNodeRelease(pSyncNode);
598 599
  return 0;
}
600
#endif
601

602 603 604 605 606 607 608 609 610 611
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

  for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
612
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
613
         snapshotLastApplyIndex, lastIndex);
614 615 616 617

  return lastIndex;
}

618
#if 0
M
Minghao Li 已提交
619
SyncTerm syncGetMyTerm(int64_t rid) {
S
Shengliang Guan 已提交
620
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
621 622 623
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
M
Minghao Li 已提交
624
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
625
  SyncTerm term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
626

S
Shengliang Guan 已提交
627
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
628
  return term;
M
Minghao Li 已提交
629 630
}

631
SyncIndex syncGetLastIndex(int64_t rid) {
S
Shengliang Guan 已提交
632
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
633 634 635 636 637 638
  if (pSyncNode == NULL) {
    return SYNC_INDEX_INVALID;
  }
  ASSERT(rid == pSyncNode->rid);
  SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);

S
Shengliang Guan 已提交
639
  syncNodeRelease(pSyncNode);
640 641 642 643
  return lastIndex;
}

SyncIndex syncGetCommitIndex(int64_t rid) {
S
Shengliang Guan 已提交
644
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
645 646 647 648 649 650
  if (pSyncNode == NULL) {
    return SYNC_INDEX_INVALID;
  }
  ASSERT(rid == pSyncNode->rid);
  SyncIndex cmtIndex = pSyncNode->commitIndex;

S
Shengliang Guan 已提交
651
  syncNodeRelease(pSyncNode);
652 653 654
  return cmtIndex;
}

M
Minghao Li 已提交
655
SyncGroupId syncGetVgId(int64_t rid) {
S
Shengliang Guan 已提交
656
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
657
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
658 659
    return TAOS_SYNC_STATE_ERROR;
  }
M
Minghao Li 已提交
660
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
661
  SyncGroupId vgId = pSyncNode->vgId;
M
Minghao Li 已提交
662

S
Shengliang Guan 已提交
663
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
664
  return vgId;
M
Minghao Li 已提交
665 666
}

M
Minghao Li 已提交
667
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
668
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
669 670 671 672
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
M
Minghao Li 已提交
673
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
674 675
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
676 677
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
M
Minghao Li 已提交
678
    (pEpSet->numOfEps)++;
S
Shengliang Guan 已提交
679
    sInfo("vgId:%d, sync get epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
M
Minghao Li 已提交
680 681
  }
  pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
S
Shengliang Guan 已提交
682
  sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
683

S
Shengliang Guan 已提交
684
  syncNodeRelease(pSyncNode);
685
}
686
#endif
M
Minghao Li 已提交
687

688
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
689
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
690 691 692 693 694 695 696 697 698 699
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
  ASSERT(rid == pSyncNode->rid);
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    (pEpSet->numOfEps)++;
M
Minghao Li 已提交
700 701
    sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn,
          pEpSet->eps[i].port);
702
  }
M
Minghao Li 已提交
703 704 705
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
  }
M
Minghao Li 已提交
706
  sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
M
Minghao Li 已提交
707

S
Shengliang Guan 已提交
708
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
709
}
S
Shengliang Guan 已提交
710
static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) {
M
Minghao Li 已提交
711 712 713
  SRespStub stub;
  int32_t   ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
S
Shengliang Guan 已提交
714
    *pInfo = stub.rpcMsg.info;
M
Minghao Li 已提交
715 716
  }

S
Shengliang Guan 已提交
717
  sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
M
Minghao Li 已提交
718 719 720
}

char* sync2SimpleStr(int64_t rid) {
S
Shengliang Guan 已提交
721
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
722
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
723
    sTrace("syncSetRpc get pSyncNode is NULL, rid:%" PRId64, rid);
M
Minghao Li 已提交
724 725
    return NULL;
  }
M
Minghao Li 已提交
726
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
727
  char* s = syncNode2SimpleStr(pSyncNode);
S
Shengliang Guan 已提交
728
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
729 730 731 732

  return s;
}

M
Minghao Li 已提交
733
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
734
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
735
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
736
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
737 738
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
739
  }
M
Minghao Li 已提交
740
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
741

742
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
S
Shengliang Guan 已提交
743
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
744 745 746
  return ret;
}

747
static bool syncNodeBatchOK(SRpcMsg** pMsgPArr, int32_t arrSize) {
M
Minghao Li 已提交
748
  for (int32_t i = 0; i < arrSize; ++i) {
749
    if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE) {
M
Minghao Li 已提交
750 751 752
      return false;
    }

753
    if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
M
Minghao Li 已提交
754 755 756 757 758 759 760
      return false;
    }
  }

  return true;
}

761
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
762
  int32_t ret = 0;
M
Minghao Li 已提交
763

M
Minghao Li 已提交
764 765
  do {
    char eventLog[128];
S
Shengliang Guan 已提交
766
    snprintf(eventLog, sizeof(eventLog), "propose message, type:%s", TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
767 768
    syncNodeEventLog(pSyncNode, eventLog);
  } while (0);
M
Minghao Li 已提交
769

M
Minghao Li 已提交
770
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
771 772 773
    if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
      ret = -1;
      terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
S
Shengliang Guan 已提交
774
      sError("vgId:%d, failed to sync propose since not ready, type:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
775 776 777 778 779 780 781 782
      goto _END;
    }

    // config change
    if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
      if (!syncNodeCanChange(pSyncNode)) {
        ret = -1;
        terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY;
S
Shengliang Guan 已提交
783
        sError("vgId:%d, failed to sync reconfig since not ready, type:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
784 785 786 787 788 789 790
        goto _END;
      }

      ASSERT(!pSyncNode->changing);
      pSyncNode->changing = true;
    }

791 792
    // not restored, vnode enable
    if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
793 794
      ret = -1;
      terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
S
Shengliang Guan 已提交
795 796
      sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
             pSyncNode->vgId, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
797 798 799
      goto _END;
    }

M
Minghao Li 已提交
800 801 802 803 804 805
    SRespStub stub;
    stub.createTime = taosGetTimestampMs();
    stub.rpcMsg = *pMsg;
    uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);

    SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
M
Minghao Li 已提交
806 807
    SRpcMsg            rpcMsg;
    syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
808

809 810 811
    // optimized one replica
    if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
      SyncIndex retIndex;
M
Minghao Li 已提交
812
      int32_t   code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex);
813 814
      if (code == 0) {
        pMsg->info.conn.applyIndex = retIndex;
M
Minghao Li 已提交
815
        pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
816 817 818
        rpcFreeCont(rpcMsg.pCont);
        syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
        ret = 1;
819 820
        sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
               TMSG_INFO(pMsg->msgType));
821 822 823
      } else {
        ret = -1;
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
S
Shengliang Guan 已提交
824 825
        sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
               TMSG_INFO(pMsg->msgType));
826 827
      }

M
Minghao Li 已提交
828
    } else {
S
Shengliang Guan 已提交
829
      if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
830 831 832 833
        ret = 0;
      } else {
        ret = -1;
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
S
Shengliang Guan 已提交
834
        sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId);
835
      }
M
Minghao Li 已提交
836
    }
837

M
Minghao Li 已提交
838
    syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
839 840
    goto _END;

M
Minghao Li 已提交
841
  } else {
M
Minghao Li 已提交
842 843
    ret = -1;
    terrno = TSDB_CODE_SYN_NOT_LEADER;
844
    sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncStr(pSyncNode->state),
S
Shengliang Guan 已提交
845
           TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
846
    goto _END;
M
Minghao Li 已提交
847
  }
M
Minghao Li 已提交
848

M
Minghao Li 已提交
849
_END:
M
Minghao Li 已提交
850 851 852
  return ret;
}

853 854 855 856 857 858 859 860 861 862 863 864
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
865
  if (syncIsInit()) {
M
Minghao Li 已提交
866
    SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
867 868 869 870
    pData->pSyncNode = pSyncNode;
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
M
Minghao Li 已提交
871

872
    pSyncTimer->pData = pData;
S
Shengliang Guan 已提交
873
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
874 875 876 877 878 879 880 881 882 883 884
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
M
Minghao Li 已提交
885
  // taosMemoryFree(pSyncTimer->pData);
886 887 888
  return ret;
}

S
Shengliang Guan 已提交
889 890
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
891 892 893 894
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
895

M
Minghao Li 已提交
896 897 898 899
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
900
      goto _error;
M
Minghao Li 已提交
901
    }
902
  }
M
Minghao Li 已提交
903

S
Shengliang Guan 已提交
904
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
905
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
906
    // create a new raft config file
S
Shengliang Guan 已提交
907
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
908
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
909
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
910
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
911
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
912 913
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
914
      goto _error;
915
    }
916
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
917
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
918 919
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
920 921 922
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
923
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
924
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
925
      goto _error;
926
    }
S
Shengliang Guan 已提交
927 928

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
929 930 931 932 933 934
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
935 936 937 938
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
939 940

    raftCfgClose(pSyncNode->pRaftCfg);
941
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
942 943
  }

S
Shengliang Guan 已提交
944 945
  // init by SSyncInfo
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
946 947 948 949 950 951 952
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
953
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
954 955 956
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
957

M
Minghao Li 已提交
958
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
959
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
960 961 962
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
963

M
Minghao Li 已提交
964 965
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
966
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
967
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
968 969
    goto _error;
  }
M
Minghao Li 已提交
970

M
Minghao Li 已提交
971
  // init internal
M
Minghao Li 已提交
972
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
973
  if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
974
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
975
    goto _error;
976
  }
M
Minghao Li 已提交
977

M
Minghao Li 已提交
978
  // init peersNum, peers, peersId
M
Minghao Li 已提交
979
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
M
Minghao Li 已提交
980
  int j = 0;
M
Minghao Li 已提交
981 982 983
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
984 985 986
      j++;
    }
  }
M
Minghao Li 已提交
987
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
988
    if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
989
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
990
      goto _error;
991
    }
M
Minghao Li 已提交
992
  }
M
Minghao Li 已提交
993

M
Minghao Li 已提交
994
  // init replicaNum, replicasId
M
Minghao Li 已提交
995 996
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
H
Hongze Cheng 已提交
997
    if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
998
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
999
      goto _error;
1000
    }
M
Minghao Li 已提交
1001 1002
  }

M
Minghao Li 已提交
1003
  // init raft algorithm
M
Minghao Li 已提交
1004
  pSyncNode->pFsm = pSyncInfo->pFsm;
1005
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
1006
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
1007 1008
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
1009
  // init life cycle outside
M
Minghao Li 已提交
1010

M
Minghao Li 已提交
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
1035
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
1036
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
1037
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
1038
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
1039
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
1040 1041
    goto _error;
  }
M
Minghao Li 已提交
1042

M
Minghao Li 已提交
1043
  // init TLA+ candidate vars
M
Minghao Li 已提交
1044
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
1045
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
1046
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
1047 1048
    goto _error;
  }
M
Minghao Li 已提交
1049
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
1050
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
1051
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
1052 1053
    goto _error;
  }
M
Minghao Li 已提交
1054

M
Minghao Li 已提交
1055 1056
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
1057
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
1058
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1059 1060
    goto _error;
  }
M
Minghao Li 已提交
1061
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
1062
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
1063
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1064 1065
    goto _error;
  }
M
Minghao Li 已提交
1066 1067 1068

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
1069
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
1070
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
1071 1072
    goto _error;
  }
1073 1074 1075 1076 1077

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1078
    if (code != 0) {
S
Shengliang Guan 已提交
1079
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
1080
      goto _error;
1081
    }
1082 1083 1084 1085 1086 1087
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
      syncNodeEventLog(pSyncNode, "reset commit index by snapshot");
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
1088

M
Minghao Li 已提交
1089 1090 1091 1092 1093
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
1094
  // init ping timer
M
Minghao Li 已提交
1095
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
1096
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
1097 1098
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
1099
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
1100
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
1101

M
Minghao Li 已提交
1102 1103
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
1104
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1105
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
1106
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
1107 1108 1109 1110
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
1111
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
1112 1113
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
1114
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
1115 1116
  pSyncNode->heartbeatTimerCounter = 0;

1117 1118 1119 1120 1121
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
1122
  // init callback
M
Minghao Li 已提交
1123 1124
  pSyncNode->FpOnPing = syncNodeOnPing;
  pSyncNode->FpOnPingReply = syncNodeOnPingReply;
M
Minghao Li 已提交
1125
  pSyncNode->FpOnClientRequest = syncNodeOnClientRequest;
1126
  pSyncNode->FpOnTimeout = syncNodeOnTimer;
M
Minghao Li 已提交
1127 1128 1129 1130 1131 1132
  pSyncNode->FpOnSnapshot = syncNodeOnSnapshot;
  pSyncNode->FpOnSnapshotReply = syncNodeOnSnapshotReply;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVote;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReply;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntries;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReply;
M
Minghao Li 已提交
1133

M
Minghao Li 已提交
1134
  // tools
M
Minghao Li 已提交
1135
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
1136
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
1137
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
1138 1139
    goto _error;
  }
M
Minghao Li 已提交
1140

1141 1142
  // restore state
  pSyncNode->restoreFinish = false;
1143

M
Minghao Li 已提交
1144 1145 1146 1147 1148 1149 1150 1151
  // snapshot senders
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
  }

  // snapshot receivers
1152
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
1153

M
Minghao Li 已提交
1154 1155 1156
  // is config changing
  pSyncNode->changing = false;

M
Minghao Li 已提交
1157 1158 1159
  // peer state
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1160 1161 1162
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1163
  // start in syncNodeStart
M
Minghao Li 已提交
1164
  // start raft
M
Minghao Li 已提交
1165
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1166

M
Minghao Li 已提交
1167 1168
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1169
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1170 1171
  pSyncNode->lastReplicateTime = timeNow;

1172 1173 1174
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

M
Minghao Li 已提交
1175
  syncNodeEventLog(pSyncNode, "sync open");
1176

M
Minghao Li 已提交
1177
  return pSyncNode;
1178 1179 1180

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1181 1182
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1183 1184 1185 1186
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1187 1188
}

M
Minghao Li 已提交
1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
    ASSERT(code == 0);
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

M
Minghao Li 已提交
1200 1201
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
1202
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
1203
    raftStoreNextTerm(pSyncNode->pRaftStore);
1204
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
1205

1206
    // Raft 3.6.2 Committing entries from previous terms
1207 1208
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
1209

M
Minghao Li 已提交
1210 1211
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
1212 1213
  }

1214 1215 1216
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
1217 1218
}

M
Minghao Li 已提交
1219 1220 1221 1222 1223 1224 1225 1226 1227
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
1228

1229 1230 1231
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
1232 1233
}

M
Minghao Li 已提交
1234 1235 1236 1237 1238 1239 1240 1241
void syncNodePreClose(SSyncNode* pSyncNode) {
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

M
Minghao Li 已提交
1242
void syncNodeClose(SSyncNode* pSyncNode) {
1243 1244 1245
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
1246 1247
  int32_t ret;

M
Minghao Li 已提交
1248 1249
  syncNodeEventLog(pSyncNode, "sync close");

M
Minghao Li 已提交
1250
  ret = raftStoreClose(pSyncNode->pRaftStore);
M
Minghao Li 已提交
1251
  ASSERT(ret == 0);
M
Minghao Li 已提交
1252

M
Minghao Li 已提交
1253
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1254
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1255
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1256
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1257
  votesRespondDestory(pSyncNode->pVotesRespond);
1258
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1259
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1260
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1261
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1262
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1263
  logStoreDestory(pSyncNode->pLogStore);
1264
  pSyncNode->pLogStore = NULL;
M
Minghao Li 已提交
1265
  raftCfgClose(pSyncNode->pRaftCfg);
1266
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1267 1268 1269 1270 1271

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1272 1273 1274 1275
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1276 1277 1278 1279 1280 1281 1282
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    if ((pSyncNode->senders)[i] != NULL) {
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1283 1284 1285 1286 1287
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1288
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1289 1290
}

M
Minghao Li 已提交
1291
// option
M
Minghao Li 已提交
1292 1293
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }

M
Minghao Li 已提交
1294
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1295

M
Minghao Li 已提交
1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
  syncPingLog2((char*)"==syncNodePing==", pMsg);
  int32_t ret = 0;

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);

  ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
}

int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
M
Minghao Li 已提交
1311
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId, pSyncNode->vgId);
M
Minghao Li 已提交
1312
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
1313
  ASSERT(ret == 0);
M
Minghao Li 已提交
1314 1315 1316 1317 1318 1319 1320 1321

  syncPingDestroy(pMsg);
  return ret;
}

int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
1322 1323 1324
    SRaftId*  destId = &(pSyncNode->peersId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
1325
    ASSERT(ret == 0);
M
Minghao Li 已提交
1326 1327 1328 1329 1330 1331 1332
    syncPingDestroy(pMsg);
  }
  return ret;
}

int32_t syncNodePingAll(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1333 1334 1335 1336
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    SRaftId*  destId = &(pSyncNode->replicasId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
1337
    ASSERT(ret == 0);
M
Minghao Li 已提交
1338 1339 1340 1341 1342 1343 1344 1345
    syncPingDestroy(pMsg);
  }
  return ret;
}

// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1346 1347
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1348 1349 1350
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1351
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1352
  }
M
Minghao Li 已提交
1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1366
  if (syncIsInit()) {
1367
    pSyncNode->electTimerMS = ms;
M
Minghao Li 已提交
1368 1369 1370 1371 1372 1373

    SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
    pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
    pElectTimer->pSyncNode = pSyncNode;
    pElectTimer->pData = NULL;

S
Shengliang Guan 已提交
1374
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager,
1375
                 &pSyncNode->pElectTimer);
1376

1377
  } else {
M
Minghao Li 已提交
1378
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1379
  }
M
Minghao Li 已提交
1380 1381 1382 1383 1384
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1385
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1386 1387
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1388

M
Minghao Li 已提交
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1399 1400
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1401 1402 1403 1404 1405 1406 1407
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1408
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1409 1410 1411 1412 1413 1414 1415 1416

  do {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine,
             2 * pSyncNode->electBaseLine, electMS);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

M
Minghao Li 已提交
1417 1418 1419
  return ret;
}

M
Minghao Li 已提交
1420
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1421
  int32_t ret = 0;
S
Shengliang Guan 已提交
1422 1423
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1424 1425 1426
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1427
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1428
  }
1429 1430 1431 1432 1433 1434 1435

  do {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

M
Minghao Li 已提交
1436 1437 1438
  return ret;
}

M
Minghao Li 已提交
1439
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1440
  int32_t ret = 0;
M
Minghao Li 已提交
1441

1442
#if 0
M
Minghao Li 已提交
1443
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1444 1445
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1446

1447 1448
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1449 1450 1451
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1452
  }
1453

M
Minghao Li 已提交
1454 1455 1456
  return ret;
}

M
Minghao Li 已提交
1457 1458
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1459 1460

#if 0
M
Minghao Li 已提交
1461 1462 1463
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1464
#endif
1465

1466 1467
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1468 1469 1470
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1471
  }
1472

M
Minghao Li 已提交
1473 1474 1475
  return ret;
}

1476 1477 1478 1479 1480 1481
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1482 1483 1484 1485
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1486
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1487 1488 1489
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1490
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1491
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1492
  } else {
M
Minghao Li 已提交
1493 1494
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
    return -1;
M
Minghao Li 已提交
1495
  }
M
Minghao Li 已提交
1496

M
Minghao Li 已提交
1497 1498 1499 1500 1501 1502
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1503
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1504 1505 1506
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1507
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1508
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1509
  } else {
M
Minghao Li 已提交
1510
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1511
  }
M
Minghao Li 已提交
1512 1513 1514
  return 0;
}

M
Minghao Li 已提交
1515
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
C
Cary Xu 已提交
1516
  char   u64buf[128] = {0};
M
Minghao Li 已提交
1517 1518
  cJSON* pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
1519 1520 1521
  if (pSyncNode != NULL) {
    // init by SSyncInfo
    cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
M
Minghao Li 已提交
1522
    cJSON_AddItemToObject(pRoot, "SRaftCfg", raftCfg2Json(pSyncNode->pRaftCfg));
M
Minghao Li 已提交
1523
    cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
M
Minghao Li 已提交
1524 1525 1526
    cJSON_AddStringToObject(pRoot, "raftStorePath", pSyncNode->raftStorePath);
    cJSON_AddStringToObject(pRoot, "configPath", pSyncNode->configPath);

M
Minghao Li 已提交
1527 1528 1529
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);

S
Shengliang Guan 已提交
1530
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
1531
    cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
S
Shengliang Guan 已提交
1532 1533
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncSendMSg);
    cJSON_AddStringToObject(pRoot, "syncSendMSg", u64buf);
M
Minghao Li 已提交
1534

S
Shengliang Guan 已提交
1535
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
1536
    cJSON_AddStringToObject(pRoot, "queue", u64buf);
S
Shengliang Guan 已提交
1537 1538
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncEqMsg);
    cJSON_AddStringToObject(pRoot, "syncEqMsg", u64buf);
M
Minghao Li 已提交
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556

    // init internal
    cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
    cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
    cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
    cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);

    cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
    cJSON* pPeers = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
    }
    cJSON* pPeersId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
    }
M
Minghao Li 已提交
1557

M
Minghao Li 已提交
1558 1559 1560 1561 1562 1563
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
    cJSON* pReplicasId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
    }
M
Minghao Li 已提交
1564

M
Minghao Li 已提交
1565 1566 1567 1568 1569 1570 1571
    // raft algorithm
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
    cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
    cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
    cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);

M
Minghao Li 已提交
1572
    // life cycle
S
Shengliang Guan 已提交
1573
    snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->rid);
M
Minghao Li 已提交
1574 1575
    cJSON_AddStringToObject(pRoot, "rid", u64buf);

M
Minghao Li 已提交
1576 1577
    // tla+ server vars
    cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
1578
    cJSON_AddStringToObject(pRoot, "state_str", syncStr(pSyncNode->state));
M
Minghao Li 已提交
1579
    cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590

    // tla+ candidate vars
    cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
    cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));

    // tla+ leader vars
    cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
    cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));

    // tla+ log vars
    cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
S
Shengliang Guan 已提交
1591
    snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->commitIndex);
M
Minghao Li 已提交
1592 1593
    cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);

M
Minghao Li 已提交
1594 1595 1596 1597 1598
    // timer ms init
    cJSON_AddNumberToObject(pRoot, "pingBaseLine", pSyncNode->pingBaseLine);
    cJSON_AddNumberToObject(pRoot, "electBaseLine", pSyncNode->electBaseLine);
    cJSON_AddNumberToObject(pRoot, "hbBaseLine", pSyncNode->hbBaseLine);

M
Minghao Li 已提交
1599 1600 1601 1602
    // ping timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
    cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
S
Shengliang Guan 已提交
1603
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerLogicClock);
M
Minghao Li 已提交
1604
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
S
Shengliang Guan 已提交
1605
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1606 1607 1608
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
    cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
S
Shengliang Guan 已提交
1609
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerCounter);
M
Minghao Li 已提交
1610 1611 1612 1613 1614 1615
    cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);

    // elect timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
    cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
S
Shengliang Guan 已提交
1616
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClock);
M
Minghao Li 已提交
1617 1618 1619
    cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
    cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
S
Shengliang Guan 已提交
1620
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerCounter);
M
Minghao Li 已提交
1621 1622 1623 1624 1625 1626
    cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);

    // heartbeat timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
    cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
S
Shengliang Guan 已提交
1627
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerLogicClock);
M
Minghao Li 已提交
1628
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
S
Shengliang Guan 已提交
1629
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerLogicClockUser);
M
Minghao Li 已提交
1630 1631 1632
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
    cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
S
Shengliang Guan 已提交
1633
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerCounter);
M
Minghao Li 已提交
1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650
    cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);

    // callback
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
    cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
    cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
    cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
M
Minghao Li 已提交
1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663

    // restoreFinish
    cJSON_AddNumberToObject(pRoot, "restoreFinish", pSyncNode->restoreFinish);

    // snapshot senders
    cJSON* pSenders = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "senders", pSenders);
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      cJSON_AddItemToArray(pSenders, snapshotSender2Json((pSyncNode->senders)[i]));
    }

    // snapshot receivers
    cJSON* pReceivers = cJSON_CreateArray();
1664
    cJSON_AddItemToObject(pRoot, "receiver", snapshotReceiver2Json(pSyncNode->pNewNodeReceiver));
M
Minghao Li 已提交
1665 1666 1667

    // changing
    cJSON_AddNumberToObject(pRoot, "changing", pSyncNode->changing);
M
Minghao Li 已提交
1668 1669 1670 1671 1672 1673 1674
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
  return pJson;
}

M
Minghao Li 已提交
1675 1676 1677 1678 1679 1680 1681
char* syncNode2Str(const SSyncNode* pSyncNode) {
  cJSON* pJson = syncNode2Json(pSyncNode);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

1682
inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
M
Minghao Li 已提交
1683 1684 1685 1686
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1687
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
M
Minghao Li 已提交
1688
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
M
Minghao Li 已提交
1689 1690
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
  }
M
Minghao Li 已提交
1691 1692 1693 1694 1695 1696 1697

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pLogStore != NULL) {
    logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  }
M
Minghao Li 已提交
1698

M
Minghao Li 已提交
1699
  char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
1700 1701 1702 1703
  char* printStr = "";
  if (pCfgStr != NULL) {
    printStr = pCfgStr;
  }
M
Minghao Li 已提交
1704

1705 1706 1707
  char*   peerStateStr = syncNodePeerState2Str(pSyncNode);
  int32_t userStrLen = strlen(str) + strlen(peerStateStr);

M
Minghao Li 已提交
1708
  if (userStrLen < 256) {
M
Minghao Li 已提交
1709
    char logBuf[256 + 256];
1710 1711
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
1712 1713
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1714 1715 1716
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
1717
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
1718
               pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1719 1720 1721 1722
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1723
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
1724 1725 1726
    } else {
      snprintf(logBuf, sizeof(logBuf), "%s", str);
    }
1727
    // sDebug("%s", logBuf);
M
Minghao Li 已提交
1728 1729
    // sInfo("%s", logBuf);
    sTrace("%s", logBuf);
M
Minghao Li 已提交
1730

M
Minghao Li 已提交
1731
  } else {
M
Minghao Li 已提交
1732
    int   len = 256 + userStrLen;
M
Minghao Li 已提交
1733
    char* s = (char*)taosMemoryMalloc(len);
1734 1735
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(s, len,
M
Minghao Li 已提交
1736 1737
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1738 1739 1740
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
1741
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
1742
               pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1743 1744 1745 1746
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1747
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
1748 1749 1750
    } else {
      snprintf(s, len, "%s", str);
    }
1751
    // sDebug("%s", s);
M
Minghao Li 已提交
1752 1753
    // sInfo("%s", s);
    sTrace("%s", s);
M
Minghao Li 已提交
1754 1755
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
1756

M
Minghao Li 已提交
1757
  taosMemoryFree(peerStateStr);
M
Minghao Li 已提交
1758
  taosMemoryFree(pCfgStr);
M
Minghao Li 已提交
1759 1760
}

1761
inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
M
Minghao Li 已提交
1762 1763 1764 1765
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1766 1767 1768
  int32_t userStrLen = strlen(str);

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
M
Minghao Li 已提交
1769
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
M
Minghao Li 已提交
1770 1771
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
  }
M
Minghao Li 已提交
1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pLogStore != NULL) {
    logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  }

  char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
  char* printStr = "";
  if (pCfgStr != NULL) {
    printStr = pCfgStr;
  }
M
Minghao Li 已提交
1785 1786

  if (userStrLen < 256) {
M
Minghao Li 已提交
1787
    char logBuf[256 + 256];
1788 1789
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
1790 1791
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1792 1793 1794
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
M
Minghao Li 已提交
1795
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
1796
               pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1797 1798 1799 1800
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1801
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
1802 1803 1804
    } else {
      snprintf(logBuf, sizeof(logBuf), "%s", str);
    }
M
Minghao Li 已提交
1805 1806 1807
    sError("%s", logBuf);

  } else {
M
Minghao Li 已提交
1808
    int   len = 256 + userStrLen;
M
Minghao Li 已提交
1809
    char* s = (char*)taosMemoryMalloc(len);
1810 1811
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(s, len,
M
Minghao Li 已提交
1812 1813
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1814 1815 1816
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
M
Minghao Li 已提交
1817
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
1818
               pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1819 1820 1821 1822
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1823
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
1824 1825 1826
    } else {
      snprintf(s, len, "%s", str);
    }
M
Minghao Li 已提交
1827 1828 1829
    sError("%s", s);
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
1830 1831

  taosMemoryFree(pCfgStr);
M
Minghao Li 已提交
1832 1833
}

1834
inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1835 1836
  int   len = 256;
  char* s = (char*)taosMemoryMalloc(len);
M
Minghao Li 已提交
1837 1838 1839 1840 1841 1842 1843 1844

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);

M
Minghao Li 已提交
1845
  snprintf(s, len,
M
Minghao Li 已提交
1846 1847 1848 1849
           "vgId:%d, sync %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
           ", sby:%d, "
           "r-num:%d, "
           "lcfg:%" PRId64 ", chging:%d, rsto:%d",
1850 1851 1852
           pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex,
           logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
           pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish);
M
Minghao Li 已提交
1853

M
Minghao Li 已提交
1854 1855 1856
  return s;
}

1857
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883
  bool b1 = false;
  bool b2 = false;

  for (int i = 0; i < config->replicaNum; ++i) {
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

  for (int i = 0; i < config->replicaNum; ++i) {
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

  ASSERT(b1 == b2);
  return b1;
}

1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1897
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1898
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1899 1900 1901 1902
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1903

1904
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1905 1906
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

M
Minghao Li 已提交
1907 1908
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1909

M
Minghao Li 已提交
1910 1911
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1912

M
Minghao Li 已提交
1913 1914 1915 1916
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1917
  }
1918

M
Minghao Li 已提交
1919 1920 1921 1922 1923
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1924

M
Minghao Li 已提交
1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935
  // log begin config change
  do {
    char  eventLog[256];
    char* pOldCfgStr = syncCfg2SimpleStr(&oldConfig);
    char* pNewCfgStr = syncCfg2SimpleStr(pNewConfig);
    snprintf(eventLog, sizeof(eventLog), "begin do config change, from %s to %s", pOldCfgStr, pNewCfgStr);
    syncNodeEventLog(pSyncNode, eventLog);
    taosMemoryFree(pOldCfgStr);
    taosMemoryFree(pNewCfgStr);
  } while (0);

M
Minghao Li 已提交
1936 1937
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1938
  }
M
Minghao Li 已提交
1939 1940
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1941 1942
  }

M
Minghao Li 已提交
1943
  // add last config index
M
Minghao Li 已提交
1944
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1945

M
Minghao Li 已提交
1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      oldSenders[i] = (pSyncNode->senders)[i];
M
Minghao Li 已提交
1957

M
Minghao Li 已提交
1958 1959 1960 1961
      char* eventLog = snapshotSender2SimpleStr(oldSenders[i], "snapshot sender save old");
      syncNodeEventLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    }
1962

M
Minghao Li 已提交
1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
    syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
    int j = 0;
    for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
    }
1979

M
Minghao Li 已提交
1980 1981 1982 1983 1984
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
    for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
      syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
    }
1985

1986 1987 1988
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1989 1990 1991 1992
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1993

M
Minghao Li 已提交
1994
    // reset snapshot senders
1995

M
Minghao Li 已提交
1996 1997 1998 1999
    // clear new
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
2000

M
Minghao Li 已提交
2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012
    // reset new
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      // reset sender
      bool reset = false;
      for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);

          do {
            char eventLog[256];
S
Shengliang Guan 已提交
2013
            snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for: %" PRIu64 ", newIndex:%d, %s:%d, %p",
M
Minghao Li 已提交
2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033
                     (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
            syncNodeEventLog(pSyncNode, eventLog);
          } while (0);

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

          do {
            char eventLog[256];
            snprintf(eventLog, sizeof(eventLog),
                     "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex, i, host,
                     port, (pSyncNode->senders)[i], reset);
            syncNodeEventLog(pSyncNode, eventLog);
          } while (0);
        }
2034 2035
      }
    }
2036

M
Minghao Li 已提交
2037 2038 2039 2040
    // create new
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
M
Minghao Li 已提交
2041

M
Minghao Li 已提交
2042 2043 2044 2045
        char* eventLog = snapshotSender2SimpleStr((pSyncNode->senders)[i], "snapshot sender create new");
        syncNodeEventLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      }
2046 2047
    }

M
Minghao Li 已提交
2048 2049 2050 2051
    // free old
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      if (oldSenders[i] != NULL) {
        snapshotSenderDestroy(oldSenders[i]);
M
Minghao Li 已提交
2052

M
Minghao Li 已提交
2053 2054 2055 2056 2057
        do {
          char eventLog[128];
          snprintf(eventLog, sizeof(eventLog), "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
          syncNodeEventLog(pSyncNode, eventLog);
        } while (0);
M
Minghao Li 已提交
2058

M
Minghao Li 已提交
2059 2060
        oldSenders[i] = NULL;
      }
2061 2062
    }

2063
    // persist cfg
M
Minghao Li 已提交
2064
    raftCfgPersist(pSyncNode->pRaftCfg);
2065

M
Minghao Li 已提交
2066 2067 2068
    char  tmpbuf[512];
    char* oldStr = syncCfg2SimpleStr(&oldConfig);
    char* newStr = syncCfg2SimpleStr(pNewConfig);
2069 2070
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
M
Minghao Li 已提交
2071 2072
    taosMemoryFree(oldStr);
    taosMemoryFree(newStr);
M
Minghao Li 已提交
2073

M
Minghao Li 已提交
2074 2075 2076
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
2077 2078 2079 2080 2081

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
      syncMaybeAdvanceCommitIndex(pSyncNode);

M
Minghao Li 已提交
2082 2083 2084 2085
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
2086
    // persist cfg
M
Minghao Li 已提交
2087
    raftCfgPersist(pSyncNode->pRaftCfg);
2088

M
Minghao Li 已提交
2089 2090 2091
    char  tmpbuf[512];
    char* oldStr = syncCfg2SimpleStr(&oldConfig);
    char* newStr = syncCfg2SimpleStr(pNewConfig);
2092 2093
    snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
M
Minghao Li 已提交
2094 2095 2096
    taosMemoryFree(oldStr);
    taosMemoryFree(newStr);
    syncNodeEventLog(pSyncNode, tmpbuf);
2097
  }
2098

M
Minghao Li 已提交
2099
_END:
M
Minghao Li 已提交
2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110

  // log end config change
  do {
    char  eventLog[256];
    char* pOldCfgStr = syncCfg2SimpleStr(&oldConfig);
    char* pNewCfgStr = syncCfg2SimpleStr(pNewConfig);
    snprintf(eventLog, sizeof(eventLog), "end do config change, from %s to %s", pOldCfgStr, pNewCfgStr);
    syncNodeEventLog(pSyncNode, eventLog);
    taosMemoryFree(pOldCfgStr);
    taosMemoryFree(pNewCfgStr);
  } while (0);
M
Minghao Li 已提交
2111
  return;
M
Minghao Li 已提交
2112 2113
}

M
Minghao Li 已提交
2114 2115 2116 2117
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
2118
    char tmpBuf[64];
S
Shengliang Guan 已提交
2119
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRIu64, term);
2120
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
2121 2122 2123 2124
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

2125 2126 2127 2128 2129 2130
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
2131
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
2132 2133 2134 2135 2136 2137 2138
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "step down, ignore, new-term:%" PRIu64 ", current-term:%" PRIu64, newTerm,
             pSyncNode->pRaftStore->currentTerm);
    syncNodeEventLog(pSyncNode, logBuf);
    return;
  }
M
Minghao Li 已提交
2139 2140 2141

  do {
    char logBuf[128];
S
Shengliang Guan 已提交
2142
    snprintf(logBuf, sizeof(logBuf), "step down, new-term:%" PRIu64 ", current-term:%" PRIu64, newTerm,
M
Minghao Li 已提交
2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160
             pSyncNode->pRaftStore->currentTerm);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRIu64, newTerm);
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

2161 2162
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

2163
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
2164
  // maybe clear leader cache
M
Minghao Li 已提交
2165 2166 2167 2168
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
2169
  // state change
M
Minghao Li 已提交
2170 2171 2172
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
2173 2174
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
2175

2176 2177 2178
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

2179 2180 2181 2182 2183
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
2184 2185 2186
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200
  // trace log
  do {
    int32_t debugStrLen = strlen(debugStr);
    if (debugStrLen < 256) {
      char eventLog[256 + 64];
      snprintf(eventLog, sizeof(eventLog), "become follower %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
    } else {
      char* eventLog = taosMemoryMalloc(debugStrLen + 64);
      snprintf(eventLog, debugStrLen, "become follower %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    }
  } while (0);
M
Minghao Li 已提交
2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
2221
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
2222 2223
  pSyncNode->leaderTime = taosGetTimestampMs();

2224 2225 2226
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
2227
  // state change
M
Minghao Li 已提交
2228
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
2229 2230

  // set leader cache
M
Minghao Li 已提交
2231 2232 2233
  pSyncNode->leaderCache = pSyncNode->myRaftId;

  for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
2234 2235
    // maybe overwrite myself, no harm
    // just do it!
2236 2237 2238 2239 2240 2241 2242 2243 2244

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
2245 2246 2247
  }

  for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
2248 2249
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
2250 2251 2252
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
2253 2254 2255
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
2256
#if 0
2257 2258
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2259 2260 2261 2262 2263
  if (pMySender != NULL) {
    for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
2264
    }
2265
    (pMySender->privateTerm) += 100;
2266
  }
M
Minghao Li 已提交
2267
#endif
2268

2269 2270 2271 2272 2273
  // close receiver
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
2274
  // stop elect timer
M
Minghao Li 已提交
2275
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
2276

M
Minghao Li 已提交
2277 2278
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
2279

M
Minghao Li 已提交
2280 2281
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
2282

2283 2284 2285 2286 2287
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
2288 2289 2290
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304
  // trace log
  do {
    int32_t debugStrLen = strlen(debugStr);
    if (debugStrLen < 256) {
      char eventLog[256 + 64];
      snprintf(eventLog, sizeof(eventLog), "become leader %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
    } else {
      char* eventLog = taosMemoryMalloc(debugStrLen + 64);
      snprintf(eventLog, debugStrLen, "become leader %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    }
  } while (0);
M
Minghao Li 已提交
2305 2306 2307
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2308 2309
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
2310
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
2311

M
Minghao Li 已提交
2312 2313
  syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);

M
Minghao Li 已提交
2314
  // Raft 3.6.2 Committing entries from previous terms
2315 2316
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
2317 2318

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2319
    syncNodeReplicate(pSyncNode);
2320
  }
M
Minghao Li 已提交
2321 2322
}

M
Minghao Li 已提交
2323 2324
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
2325 2326 2327 2328 2329 2330 2331
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
2332 2333 2334
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2335
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
2336
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
M
Minghao Li 已提交
2337

M
Minghao Li 已提交
2338
  syncNodeEventLog(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
2339 2340 2341
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2342
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
2343
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
M
Minghao Li 已提交
2344

M
Minghao Li 已提交
2345
  syncNodeEventLog(pSyncNode, "leader to follower");
M
Minghao Li 已提交
2346 2347 2348
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2349
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
2350
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
2351

M
Minghao Li 已提交
2352
  syncNodeEventLog(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
2353 2354 2355
}

// raft vote --------------
M
Minghao Li 已提交
2356 2357 2358

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
2359
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
M
Minghao Li 已提交
2360 2361
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
2362 2363 2364 2365

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
2366
// simulate get vote from outside
M
Minghao Li 已提交
2367 2368 2369
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

M
Minghao Li 已提交
2370
  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId);
M
Minghao Li 已提交
2371 2372 2373 2374 2375 2376 2377 2378 2379 2380
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}

M
Minghao Li 已提交
2381
// snapshot --------------
M
Minghao Li 已提交
2382 2383

// return if has a snapshot
M
Minghao Li 已提交
2384 2385
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
2386
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2387 2388
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2389 2390 2391 2392 2393 2394 2395
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
2396 2397
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
2398
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2399
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2400 2401
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2402 2403 2404 2405 2406 2407 2408
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
2409 2410
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
2411 2412
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
2413 2414
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
2415
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2416 2417
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2418 2419
    }

M
Minghao Li 已提交
2420 2421 2422
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
2423 2424 2425 2426
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
2427
  } else {
M
Minghao Li 已提交
2428 2429
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
2430
  }
M
Minghao Li 已提交
2431

M
Minghao Li 已提交
2432 2433 2434 2435 2436 2437 2438
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
2439 2440
  return 0;
}
M
Minghao Li 已提交
2441

M
Minghao Li 已提交
2442
// return append-entries first try index
M
Minghao Li 已提交
2443 2444 2445 2446 2447
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
2448 2449
// if index > 0, return index - 1
// else, return -1
2450 2451 2452 2453 2454 2455 2456 2457 2458
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
2459 2460 2461 2462
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

  SyncTerm        preTerm = 0;
  SyncIndex       preIndex = index - 1;
  SSyncRaftEntry* pPreEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
M
Minghao Li 已提交
2476 2477 2478 2479 2480 2481

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2482 2483 2484 2485 2486 2487
  if (code == 0) {
    ASSERT(pPreEntry != NULL);
    preTerm = pPreEntry->term;
    taosMemoryFree(pPreEntry);
    return preTerm;
  } else {
2488 2489 2490 2491
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2492 2493 2494 2495
      }
    }
  }

2496 2497
  do {
    char logBuf[128];
S
Shengliang Guan 已提交
2498 2499
    snprintf(logBuf, sizeof(logBuf),
             "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRIu64, index,
M
Minghao Li 已提交
2500
             snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2501 2502 2503
    syncNodeErrorLog(pSyncNode, logBuf);
  } while (0);

2504 2505
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2506 2507 2508 2509

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2510
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2511 2512 2513
  return 0;
}

M
Minghao Li 已提交
2514 2515 2516
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2517
  printf("syncNodePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
M
Minghao Li 已提交
2518
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
2519
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2520 2521 2522 2523
}

void syncNodePrint2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2524
  printf("syncNodePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
M
Minghao Li 已提交
2525
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
2526
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2527 2528 2529 2530
}

void syncNodeLog(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2531
  sTraceLong("syncNodeLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
2532
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2533 2534 2535
}

void syncNodeLog2(char* s, SSyncNode* pObj) {
2536 2537
  if (gRaftDetailLog) {
    char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2538
    sTraceLong("syncNodeLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
2539 2540
    taosMemoryFree(serialized);
  }
M
Minghao Li 已提交
2541 2542
}

M
Minghao Li 已提交
2543 2544
void syncNodeLog3(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2545
  sTraceLong("syncNodeLog3 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
M
Minghao Li 已提交
2546 2547 2548
  taosMemoryFree(serialized);
}

M
Minghao Li 已提交
2549
// ------ local funciton ---------
M
Minghao Li 已提交
2550
// enqueue message ----
M
Minghao Li 已提交
2551 2552
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
2553
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
2554
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
M
Minghao Li 已提交
2555
                                              pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
2556 2557
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
2558
    syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
S
Shengliang Guan 已提交
2559 2560
    if (pSyncNode->syncEqMsg != NULL) {
      int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
2561
      if (code != 0) {
S
Shengliang Guan 已提交
2562
        sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code);
2563 2564 2565 2566
        rpcFreeCont(rpcMsg.pCont);
        syncTimeoutDestroy(pSyncMsg);
        return;
      }
M
Minghao Li 已提交
2567
    } else {
S
Shengliang Guan 已提交
2568
      sTrace("syncNodeEqPingTimer pSyncNode->syncEqMsg is NULL");
M
Minghao Li 已提交
2569
    }
M
Minghao Li 已提交
2570 2571
    syncTimeoutDestroy(pSyncMsg);

S
Shengliang Guan 已提交
2572 2573
    if (syncIsInit()) {
      taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
2574 2575 2576 2577 2578
                   &pSyncNode->pPingTimer);
    } else {
      sError("sync env is stop, syncNodeEqPingTimer");
    }

M
Minghao Li 已提交
2579
  } else {
S
Shengliang Guan 已提交
2580
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64,
M
Minghao Li 已提交
2581
           pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
2582 2583 2584 2585
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
M
Minghao Li 已提交
2586 2587
  SElectTimer* pElectTimer = (SElectTimer*)param;
  SSyncNode*   pSyncNode = pElectTimer->pSyncNode;
M
Minghao Li 已提交
2588

M
Minghao Li 已提交
2589 2590
  SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pSyncNode->electTimerMS,
                                            pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
2591 2592
  SRpcMsg      rpcMsg;
  syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
2593
  if (pSyncNode->syncEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) {
S
Shengliang Guan 已提交
2594
    int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
2595 2596 2597 2598
    if (code != 0) {
      sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
      rpcFreeCont(rpcMsg.pCont);
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
2599
      taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
2600
      return;
2601
    }
M
Minghao Li 已提交
2602 2603 2604

    do {
      char logBuf[128];
M
Minghao Li 已提交
2605
      snprintf(logBuf, sizeof(logBuf), "eq elect timer lc:%" PRIu64, pSyncMsg->logicClock);
M
Minghao Li 已提交
2606
      syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
2607 2608
    } while (0);

M
Minghao Li 已提交
2609
  } else {
S
Shengliang Guan 已提交
2610
    sTrace("syncNodeEqElectTimer syncEqMsg is NULL");
M
Minghao Li 已提交
2611
  }
M
Minghao Li 已提交
2612

M
Minghao Li 已提交
2613
  syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
2614
  taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
2615

M
Minghao Li 已提交
2616
#if 0
M
Minghao Li 已提交
2617
  // reset timer ms
S
Shengliang Guan 已提交
2618
  if (syncIsInit() && pSyncNode->electBaseLine > 0) {
M
Minghao Li 已提交
2619
    pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
S
Shengliang Guan 已提交
2620
    taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager,
M
Minghao Li 已提交
2621 2622 2623
                 &pSyncNode->pElectTimer);
  } else {
    sError("sync env is stop, syncNodeEqElectTimer");
M
Minghao Li 已提交
2624
  }
M
Minghao Li 已提交
2625
#endif
M
Minghao Li 已提交
2626 2627
}

M
Minghao Li 已提交
2628 2629
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
2630 2631 2632

  syncNodeEventLog(pSyncNode, "eq hb timer");

2633 2634 2635 2636 2637 2638 2639 2640 2641
  if (pSyncNode->replicaNum > 1) {
    if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
        atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
      SyncTimeout* pSyncMsg =
          syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
                            pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
      SRpcMsg rpcMsg;
      syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
      syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
S
Shengliang Guan 已提交
2642 2643
      if (pSyncNode->syncEqMsg != NULL) {
        int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
2644
        if (code != 0) {
S
Shengliang Guan 已提交
2645
          sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
2646 2647 2648 2649 2650
          rpcFreeCont(rpcMsg.pCont);
          syncTimeoutDestroy(pSyncMsg);
          return;
        }
      } else {
S
Shengliang Guan 已提交
2651
        sError("vgId:%d, enqueue msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
2652
      }
2653
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
2654

S
Shengliang Guan 已提交
2655 2656
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
2657 2658 2659 2660
                     &pSyncNode->pHeartbeatTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }
2661
    } else {
2662 2663 2664
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
             "",
             pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
2665
    }
M
Minghao Li 已提交
2666 2667 2668
  }
}

2669 2670 2671 2672 2673
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
  SSyncHbTimerData* pData = (SSyncHbTimerData*)param;
  SSyncNode*        pSyncNode = pData->pSyncNode;
  SSyncTimer*       pSyncTimer = pData->pTimer;

M
Minghao Li 已提交
2674 2675 2676 2677
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return;
  }

S
Shengliang Guan 已提交
2678
  // syncNodeEventLog(pSyncNode, "eq peer hb timer");
2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689

  int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
  int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

  if (pSyncNode->replicaNum > 1) {
    if (timerLogicClock == msgLogicClock) {
      SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
      pSyncMsg->srcId = pSyncNode->myRaftId;
      pSyncMsg->destId = pData->destId;
      pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
      pSyncMsg->commitIndex = pSyncNode->commitIndex;
M
Minghao Li 已提交
2690
      pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
2691 2692 2693 2694 2695 2696 2697
      pSyncMsg->privateTerm = 0;

      SRpcMsg rpcMsg;
      syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);

// eq msg
#if 0
S
Shengliang Guan 已提交
2698 2699
      if (pSyncNode->syncEqCtrlMsg != NULL) {
        int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
2700 2701 2702 2703 2704 2705 2706
        if (code != 0) {
          sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
          rpcFreeCont(rpcMsg.pCont);
          syncHeartbeatDestroy(pSyncMsg);
          return;
        }
      } else {
S
Shengliang Guan 已提交
2707
        sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
2708 2709 2710 2711
      }
#endif

      // send msg
M
Minghao Li 已提交
2712
      syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
2713 2714 2715

      syncHeartbeatDestroy(pSyncMsg);

S
Shengliang Guan 已提交
2716 2717
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729
                     &pSyncTimer->pTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }

    } else {
      sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRIu64 ", msgLogicClock:%" PRIu64 "", timerLogicClock,
             msgLogicClock);
    }
  }
}

M
Minghao Li 已提交
2730 2731
static int32_t syncNodeEqNoop(SSyncNode* ths) {
  int32_t ret = 0;
M
Minghao Li 已提交
2732
  ASSERT(ths->state == TAOS_SYNC_STATE_LEADER);
M
Minghao Li 已提交
2733

2734
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2735
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2736
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2737
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2738 2739 2740 2741

  uint32_t           entryLen;
  char*              serialized = syncEntrySerialize(pEntry, &entryLen);
  SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen);
M
Minghao Li 已提交
2742
  ASSERT(pSyncMsg->dataLen == entryLen);
M
Minghao Li 已提交
2743 2744
  memcpy(pSyncMsg->data, serialized, entryLen);

S
Shengliang Guan 已提交
2745
  SRpcMsg rpcMsg = {0};
M
Minghao Li 已提交
2746
  syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
S
Shengliang Guan 已提交
2747 2748
  if (ths->syncEqMsg != NULL) {
    ths->syncEqMsg(ths->msgcb, &rpcMsg);
M
Minghao Li 已提交
2749
  } else {
S
Shengliang Guan 已提交
2750
    sTrace("syncNodeEqNoop pSyncNode->syncEqMsg is NULL");
M
Minghao Li 已提交
2751
  }
M
Minghao Li 已提交
2752

M
Minghao Li 已提交
2753
  syncEntryDestory(pEntry);
wafwerar's avatar
wafwerar 已提交
2754
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2755 2756 2757 2758 2759
  syncClientRequestDestroy(pSyncMsg);

  return ret;
}

2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  int       code = 0;
  int       entryLen = sizeof(*pEntry) + pEntry->dataLen;
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

M
Minghao Li 已提交
2774 2775 2776
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

2777
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2778
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2779
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2780
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2781

2782 2783 2784
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2785
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2786
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2787
    if (code != 0) {
M
Minghao Li 已提交
2788
      syncNodeErrorLog(ths, "append noop error");
2789 2790
      return -1;
    }
M
Minghao Li 已提交
2791 2792
  }

2793 2794 2795 2796 2797 2798
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2799 2800 2801
  return ret;
}

M
Minghao Li 已提交
2802
// on message ----
M
Minghao Li 已提交
2803 2804 2805
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg) {
  sTrace("vgId:%d, recv sync-ping", ths->vgId);

M
Minghao Li 已提交
2806
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
M
Minghao Li 已提交
2807 2808
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
M
Minghao Li 已提交
2809 2810

  /*
M
Minghao Li 已提交
2811 2812 2813 2814 2815
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */
M
Minghao Li 已提交
2816

M
Minghao Li 已提交
2817
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
M
Minghao Li 已提交
2818
  syncPingReplyDestroy(pMsgReply);
M
Minghao Li 已提交
2819

M
Minghao Li 已提交
2820
  return 0;
M
Minghao Li 已提交
2821 2822
}

M
Minghao Li 已提交
2823
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
2824
  int32_t ret = 0;
M
Minghao Li 已提交
2825
  sTrace("vgId:%d, recv sync-ping-reply", ths->vgId);
M
Minghao Li 已提交
2826 2827
  return ret;
}
M
Minghao Li 已提交
2828

2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
  syncLogRecvHeartbeat(ths, pMsg, "");

  SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number

  SRpcMsg rpcMsg;
  syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);

M
Minghao Li 已提交
2841
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2842
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2843
    ths->minMatchIndex = pMsg->minMatchIndex;
2844 2845

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
      SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;

      SRpcMsg rpcMsgLocalCmd;
      syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index: %" PRIu64, ths->vgId, pSyncMsg->fcIndex);
        }
      }
2863 2864 2865
    }
  }

M
Minghao Li 已提交
2866
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2867 2868 2869 2870 2871 2872 2873 2874
    // syncNodeStepDown(ths, pMsg->term);
    SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

    SRpcMsg rpcMsgLocalCmd;
    syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

S
Shengliang Guan 已提交
2875 2876
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2877 2878 2879 2880 2881 2882 2883 2884 2885
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRIu64, ths->vgId, pSyncMsg->sdNewTerm);
      }
    }

    syncLocalCmdDestroy(pSyncMsg);
M
Minghao Li 已提交
2886 2887
  }

2888 2889 2890 2891 2892 2893 2894 2895 2896
  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
M
Minghao Li 已提交
2897
  syncHeartbeatReplyDestroy(pMsgReply);
2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910

  return 0;
}

int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
  syncLogRecvHeartbeatReply(ths, pMsg, "");

  // update last reply time, make decision whether the other node is alive or not
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime);

  return 0;
}

M
Minghao Li 已提交
2911
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
2912 2913
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2914 2915 2916
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2917 2918 2919
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2920 2921 2922 2923 2924 2925 2926
  } else {
    syncNodeErrorLog(ths, "error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
2927 2928 2929 2930 2931 2932 2933 2934 2935 2936
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2937

M
Minghao Li 已提交
2938
int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
2939 2940
  syncNodeEventLog(ths, "on client request");

M
Minghao Li 已提交
2941
  int32_t ret = 0;
2942
  int32_t code = 0;
M
Minghao Li 已提交
2943

M
Minghao Li 已提交
2944
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2945 2946
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
M
Minghao Li 已提交
2947
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2948

2949 2950 2951
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2952
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2953 2954 2955 2956
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
      // del resp mgr, call FpCommitCb
M
Minghao Li 已提交
2957
      ASSERT(0);
2958 2959
      return -1;
    }
M
Minghao Li 已提交
2960

2961 2962
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2963
      syncNodeReplicate(ths);
2964
    }
2965

2966 2967 2968 2969
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
      syncMaybeAdvanceCommitIndex(ths);
    }
M
Minghao Li 已提交
2970 2971
  }

2972 2973 2974 2975 2976 2977 2978 2979
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2980 2981 2982 2983 2984 2985
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2986
  return ret;
2987
}
M
Minghao Li 已提交
2988

S
Shengliang Guan 已提交
2989 2990 2991
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2992
      return "follower";
S
Shengliang Guan 已提交
2993
    case TAOS_SYNC_STATE_CANDIDATE:
2994
      return "candidate";
S
Shengliang Guan 已提交
2995
    case TAOS_SYNC_STATE_LEADER:
2996
      return "leader";
S
Shengliang Guan 已提交
2997
    default:
2998
      return "error";
S
Shengliang Guan 已提交
2999
  }
M
Minghao Li 已提交
3000
}
3001

3002
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
3003 3004 3005 3006
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
    syncNodeEventLog(ths, "I am not follower, can not do leader transfer");
    return 0;
  }
3007 3008 3009 3010 3011 3012

  if (!ths->restoreFinish) {
    syncNodeEventLog(ths, "restore not finish, can not do leader transfer");
    return 0;
  }

3013 3014
  if (pEntry->term < ths->pRaftStore->currentTerm) {
    char logBuf[128];
S
Shengliang Guan 已提交
3015
    snprintf(logBuf, sizeof(logBuf), "little term:%" PRIu64 ", can not do leader transfer", pEntry->term);
3016 3017 3018 3019 3020 3021
    syncNodeEventLog(ths, logBuf);
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
    char logBuf[128];
S
Shengliang Guan 已提交
3022
    snprintf(logBuf, sizeof(logBuf), "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
3023
    syncNodeEventLog(ths, logBuf);
3024 3025 3026
    return 0;
  }

3027 3028 3029 3030 3031 3032 3033
  /*
    if (ths->vgId > 1) {
      syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
      return 0;
    }
  */

M
Minghao Li 已提交
3034 3035
  SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);

3036 3037
  do {
    char logBuf[128];
S
Shengliang Guan 已提交
3038
    snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%" PRId64, pEntry->index);
3039 3040
    syncNodeEventLog(ths, logBuf);
  } while (0);
M
Minghao Li 已提交
3041

M
Minghao Li 已提交
3042 3043 3044
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
3045

M
Minghao Li 已提交
3046 3047
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
3048 3049 3050 3051
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
M
Minghao Li 已提交
3052 3053

    char eventLog[256];
S
Shengliang Guan 已提交
3054
    snprintf(eventLog, sizeof(eventLog), "maybe leader transfer to %s:%d %" PRIu64,
M
Minghao Li 已提交
3055 3056 3057
             pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
             pSyncLeaderTransfer->newLeaderId.addr);
    syncNodeEventLog(ths, eventLog);
3058 3059
  }

M
Minghao Li 已提交
3060
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072
    SFsmCbMeta cbMeta = {
        cbMeta.code = 0,
        cbMeta.currentTerm = ths->pRaftStore->currentTerm,
        cbMeta.flag = 0,
        cbMeta.index = pEntry->index,
        cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        cbMeta.isWeak = pEntry->isWeak,
        cbMeta.seqNum = pEntry->seqNum,
        cbMeta.state = ths->state,
        cbMeta.term = pEntry->term,
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
3073 3074
  }

M
Minghao Li 已提交
3075
  syncLeaderTransferDestroy(pSyncLeaderTransfer);
3076 3077 3078
  return 0;
}

3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
  for (int i = 0; i < pNewCfg->replicaNum; ++i) {
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

M
Minghao Li 已提交
3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115
static int32_t syncNodeConfigChangeFinish(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
  SyncReconfigFinish* pFinish = syncReconfigFinishFromRpcMsg2(pRpcMsg);
  ASSERT(pFinish);

  if (ths->pFsm->FpReConfigCb != NULL) {
    SReConfigCbMeta cbMeta = {0};
    cbMeta.code = 0;
    cbMeta.index = pEntry->index;
    cbMeta.term = pEntry->term;
    cbMeta.seqNum = pEntry->seqNum;
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index);
    cbMeta.state = ths->state;
    cbMeta.currentTerm = ths->pRaftStore->currentTerm;
    cbMeta.isWeak = pEntry->isWeak;
    cbMeta.flag = 0;

    cbMeta.oldCfg = pFinish->oldCfg;
    cbMeta.newCfg = pFinish->newCfg;
    cbMeta.newCfgIndex = pFinish->newCfgIndex;
    cbMeta.newCfgTerm = pFinish->newCfgTerm;
    cbMeta.newCfgSeqNum = pFinish->newCfgSeqNum;

S
Shengliang Guan 已提交
3116
    ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, &cbMeta);
M
Minghao Li 已提交
3117 3118
  }

3119
  // clear changing
M
Minghao Li 已提交
3120 3121 3122 3123 3124
  ths->changing = false;

  char  tmpbuf[512];
  char* oldStr = syncCfg2SimpleStr(&(pFinish->oldCfg));
  char* newStr = syncCfg2SimpleStr(&(pFinish->newCfg));
S
Shengliang Guan 已提交
3125
  snprintf(tmpbuf, sizeof(tmpbuf), "config change finish from %d to %d, index:%" PRId64 ", %s  -->  %s",
M
Minghao Li 已提交
3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137
           pFinish->oldCfg.replicaNum, pFinish->newCfg.replicaNum, pFinish->newCfgIndex, oldStr, newStr);
  taosMemoryFree(oldStr);
  taosMemoryFree(newStr);
  syncNodeEventLog(ths, tmpbuf);

  syncReconfigFinishDestroy(pFinish);

  return 0;
}

static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry,
                                    SyncReconfigFinish* pFinish) {
3138 3139 3140
  // set changing
  ths->changing = true;

M
Minghao Li 已提交
3141
  // old config
3142 3143
  SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;

M
Minghao Li 已提交
3144
  // new config
3145 3146 3147 3148 3149
  SSyncCfg newSyncCfg;
  int32_t  ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
  ASSERT(ret == 0);

  // update new config myIndex
3150 3151
  syncNodeUpdateNewConfigIndex(ths, &newSyncCfg);

M
Minghao Li 已提交
3152 3153
  // do config change
  syncNodeDoConfigChange(ths, &newSyncCfg, pEntry->index);
3154

M
Minghao Li 已提交
3155 3156 3157 3158 3159 3160
  // set pFinish
  pFinish->oldCfg = oldSyncCfg;
  pFinish->newCfg = newSyncCfg;
  pFinish->newCfgIndex = pEntry->index;
  pFinish->newCfgTerm = pEntry->term;
  pFinish->newCfgSeqNum = pEntry->seqNum;
3161

M
Minghao Li 已提交
3162 3163
  return 0;
}
3164

M
Minghao Li 已提交
3165 3166 3167 3168 3169 3170 3171 3172 3173
static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFinish* pFinish) {
  SRpcMsg rpcMsg;
  syncReconfigFinish2RpcMsg(pFinish, &rpcMsg);

  int32_t code = syncNodePropose(ths, &rpcMsg, false);
  if (code != 0) {
    sError("syncNodeProposeConfigChangeFinish error");
    ths->changing = false;
  }
3174 3175 3176
  return 0;
}

3177 3178 3179 3180
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
3181
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
3182 3183 3184 3185
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
      char eventLog[128];
      snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex,
               snapshot.lastApplyIndex);
      syncNodeEventLog(ths, eventLog);
3199

M
Minghao Li 已提交
3200 3201 3202
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
3203 3204
  }

3205 3206
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
3207 3208

  char eventLog[128];
3209
  snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
M
Minghao Li 已提交
3210
  syncNodeEventLog(ths, eventLog);
3211 3212 3213 3214 3215 3216

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
3217 3218 3219 3220 3221 3222
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        } else {
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
M
Minghao Li 已提交
3223 3224 3225
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
          if (code != 0 || pEntry == NULL) {
M
Minghao Li 已提交
3226
            syncNodeErrorLog(ths, "get log entry error");
3227
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
3228 3229
            continue;
          }
3230
        }
3231 3232 3233 3234

        SRpcMsg rpcMsg;
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

3235
        // user commit
3236 3237
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
3238
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
3239 3240 3241 3242 3243
            internalExecute = false;
          }

          do {
            char logBuf[128];
S
Shengliang Guan 已提交
3244
            snprintf(logBuf, sizeof(logBuf), "commit index:%" PRId64 ", internal:%d", i, internalExecute);
3245 3246
            syncNodeEventLog(ths, logBuf);
          } while (0);
3247

3248 3249
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

            syncGetAndDelRespRpc(ths, cbMeta.seqNum, &rpcMsg.info);
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
3264
          }
3265 3266 3267
        }

        // config change
3268
        if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
M
Minghao Li 已提交
3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282
          SyncReconfigFinish* pFinish = syncReconfigFinishBuild(ths->vgId);
          ASSERT(pFinish != NULL);

          code = syncNodeConfigChange(ths, &rpcMsg, pEntry, pFinish);
          ASSERT(code == 0);

          if (ths->state == TAOS_SYNC_STATE_LEADER) {
            syncNodeProposeConfigChangeFinish(ths, pFinish);
          }
          syncReconfigFinishDestroy(pFinish);
        }

        // config change finish
        if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
M
Minghao Li 已提交
3283
          if (rpcMsg.pCont != NULL && rpcMsg.contLen > 0) {
M
Minghao Li 已提交
3284 3285 3286
            code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry);
            ASSERT(code == 0);
          }
3287
        }
3288

3289 3290
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
3291
        // leader transfer
3292 3293 3294
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
3295
        }
3296
#endif
3297 3298

        // restore finish
3299
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
3300 3301 3302 3303 3304 3305
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
3306

3307 3308
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;

M
Minghao Li 已提交
3309
            char eventLog[128];
S
Shengliang Guan 已提交
3310 3311
            snprintf(eventLog, sizeof(eventLog), "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms, ",
                     pEntry->index, restoreDelay);
M
Minghao Li 已提交
3312
            syncNodeEventLog(ths, eventLog);
3313 3314 3315 3316
          }
        }

        rpcFreeCont(rpcMsg.pCont);
3317 3318 3319 3320 3321
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
3322 3323 3324 3325
      }
    }
  }
  return 0;
3326 3327 3328 3329 3330 3331 3332 3333 3334
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
3335 3336 3337 3338 3339 3340 3341 3342 3343 3344
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
3345
}
M
Minghao Li 已提交
3346

3347 3348 3349 3350 3351 3352 3353 3354 3355 3356
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
3369
  if (pState == NULL) {
3370
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
3371 3372
    return false;
  }
M
Minghao Li 已提交
3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
3400
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
3401 3402 3403 3404 3405 3406
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
3407 3408
}

3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
  if (timerType == SYNC_TIMEOUT_PING) {
    return "ping";
  } else if (timerType == SYNC_TIMEOUT_ELECTION) {
    return "elect";
  } else if (timerType == SYNC_TIMEOUT_HEARTBEAT) {
    return "heartbeat";
  } else {
    return "unknown";
  }
}

void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
  char logBuf[256];
S
Shengliang Guan 已提交
3423
  snprintf(logBuf, sizeof(logBuf), "recv sync-timer {type:%s, lc:%" PRIu64 ", ms:%d, data:%p}, %s",
M
Minghao Li 已提交
3424
           syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
3425 3426 3427
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3428
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
M
Minghao Li 已提交
3429 3430 3431 3432 3433 3434 3435 3436 3437 3438
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host, port,
           pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3439
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
M
Minghao Li 已提交
3440 3441 3442 3443 3444 3445 3446 3447 3448 3449
  char     logBuf[256];
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  snprintf(logBuf, sizeof(logBuf),
           "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host,
           port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3450
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
M
Minghao Li 已提交
3451 3452 3453 3454 3455 3456 3457 3458 3459
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "send sync-request-vote-reply to %s:%d {term:%" PRIu64 ", grant:%d}, %s", host, port,
           pMsg->term, pMsg->voteGranted, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3460
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
M
Minghao Li 已提交
3461 3462 3463 3464 3465 3466 3467 3468 3469
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, %s", host,
           port, pMsg->term, pMsg->voteGranted, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3470
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
M
Minghao Li 已提交
3471 3472 3473 3474 3475 3476
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3477
           ", pterm:%" PRIu64 ", cmt:%" PRId64
M
Minghao Li 已提交
3478 3479 3480 3481 3482 3483 3484
           ", "
           "datalen:%d}, %s",
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
           pMsg->dataLen, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3485
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
M
Minghao Li 已提交
3486 3487 3488 3489 3490
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3491
           "recv sync-append-entries from %s:%d {term:%" PRIu64 ", pre-index:%" PRIu64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3492
           ", cmt:%" PRIu64 ", pterm:%" PRIu64
M
Minghao Li 已提交
3493 3494 3495 3496
           ", "
           "datalen:%d}, %s",
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
           pMsg->dataLen, s);
M
Minghao Li 已提交
3497
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3498 3499
}

wafwerar's avatar
wafwerar 已提交
3500
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
M
Minghao Li 已提交
3501 3502 3503 3504 3505 3506
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3507
           ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
M
Minghao Li 已提交
3508 3509
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
           pMsg->dataLen, pMsg->dataCount, s);
M
Minghao Li 已提交
3510 3511 3512
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3513
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
M
Minghao Li 已提交
3514 3515 3516 3517 3518 3519
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3520
           ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
M
Minghao Li 已提交
3521 3522
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
           pMsg->dataLen, pMsg->dataCount, s);
M
Minghao Li 已提交
3523
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3524 3525
}

3526
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
           "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3538
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
3539 3540 3541 3542 3543
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3544
           "recv sync-append-entries-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
M
Minghao Li 已提交
3545 3546
           "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
3547
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3548
}
3549 3550 3551 3552 3553 3554 3555

void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3556
           "send sync-heartbeat to %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64
M
Minghao Li 已提交
3557 3558
           "}, %s",
           host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
3559 3560 3561 3562 3563 3564 3565 3566 3567
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3568 3569 3570
           "recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64
           "}, %s",
           host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "send sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3592
}
3593 3594 3595 3596 3597 3598

void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd,
           syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624
}

void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "send sync-pre-snapshot to %s:%d {term:%" PRIu64 "}, %s", host, port, pMsg->term, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "recv sync-pre-snapshot from %s:%d {term:%" PRIu64 "}, %s", host, port, pMsg->term,
           s);
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
3625 3626 3627
  snprintf(logBuf, sizeof(logBuf),
           "send sync-pre-snapshot-reply to %s:%d {term:%" PRIu64 ", snap-start:%" PRId64 "}, %s", host, port,
           pMsg->term, pMsg->snapStart, s);
M
Minghao Li 已提交
3628 3629 3630 3631 3632 3633 3634 3635
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
3636 3637 3638
  snprintf(logBuf, sizeof(logBuf),
           "recv sync-pre-snapshot-reply from %s:%d {term:%" PRIu64 ", snap-start:%" PRId64 "}, %s", host, port,
           pMsg->term, pMsg->snapStart, s);
M
Minghao Li 已提交
3639 3640
  syncNodeEventLog(pSyncNode, logBuf);
}
M
Minghao Li 已提交
3641 3642 3643 3644 3645 3646 3647 3648

void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}

void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}

void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}

void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}