syncMain.c 121.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

M
Minghao Li 已提交
16
#include "sync.h"
M
Minghao Li 已提交
17 18
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
19
#include "syncCommit.h"
M
Minghao Li 已提交
20
#include "syncElection.h"
M
Minghao Li 已提交
21
#include "syncEnv.h"
M
Minghao Li 已提交
22
#include "syncIndexMgr.h"
M
Minghao Li 已提交
23
#include "syncInt.h"
M
Minghao Li 已提交
24
#include "syncMessage.h"
M
Minghao Li 已提交
25
#include "syncRaftCfg.h"
M
Minghao Li 已提交
26
#include "syncRaftLog.h"
M
Minghao Li 已提交
27
#include "syncRaftStore.h"
M
Minghao Li 已提交
28
#include "syncReplication.h"
M
Minghao Li 已提交
29 30
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
31
#include "syncRespMgr.h"
M
Minghao Li 已提交
32
#include "syncSnapshot.h"
M
Minghao Li 已提交
33
#include "syncTimeout.h"
M
Minghao Li 已提交
34
#include "syncUtil.h"
M
Minghao Li 已提交
35
#include "syncVoteMgr.h"
M
Minghao Li 已提交
36 37

// ------ local funciton ---------
M
Minghao Li 已提交
38
// enqueue message ----
M
Minghao Li 已提交
39 40 41 42 43
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
44
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
45
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
M
Minghao Li 已提交
46

M
Minghao Li 已提交
47
// process message ----
M
Minghao Li 已提交
48 49
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg);
M
Minghao Li 已提交
50

51
int64_t syncOpen(SSyncInfo* pSyncInfo) {
S
Shengliang Guan 已提交
52 53
  SSyncNode* pNode = syncNodeOpen(pSyncInfo);
  if (pNode == NULL) {
S
Shengliang Guan 已提交
54
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
55 56
    return -1;
  }
M
Minghao Li 已提交
57

S
Shengliang Guan 已提交
58 59 60
  pNode->rid = syncNodeAdd(pNode);
  if (pNode->rid < 0) {
    syncNodeClose(pNode);
M
Minghao Li 已提交
61 62 63
    return -1;
  }

S
Shengliang Guan 已提交
64
  return pNode->rid;
M
Minghao Li 已提交
65
}
M
Minghao Li 已提交
66

M
Minghao Li 已提交
67
void syncStart(int64_t rid) {
S
Shengliang Guan 已提交
68 69 70 71
  SSyncNode* pNode = syncNodeAcquire(rid);
  if (pNode != NULL) {
    syncNodeStart(pNode);
    syncNodeRelease(pNode);
M
Minghao Li 已提交
72 73 74
  }
}

M
Minghao Li 已提交
75
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
76 77 78
  SSyncNode* pNode = syncNodeAcquire(rid);
  if (pNode != NULL) {
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
79
    syncNodeRemove(rid);
M
Minghao Li 已提交
80 81 82
  }
}

M
Minghao Li 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) {
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
  if (!IamInNew) {
    return false;
  }

  if (pNewCfg->replicaNum > pSyncNode->replicaNum + 1) {
    return false;
  }

  if (pNewCfg->replicaNum < pSyncNode->replicaNum - 1) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
100
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
101
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
102
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
103 104
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
105 106
  }
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
107
  int32_t ret = 0;
108

M
Minghao Li 已提交
109
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
110
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
111
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
112
    sError("invalid new config. vgId:%d", pSyncNode->vgId);
M
Minghao Li 已提交
113
    return -1;
M
Minghao Li 已提交
114 115 116 117 118 119 120 121 122
  }

  char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
  pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE;
  pRpcMsg->info.noResp = 1;
  pRpcMsg->contLen = strlen(newconfig) + 1;
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
  snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
  taosMemoryFree(newconfig);
123

S
Shengliang Guan 已提交
124
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
125 126 127
  return ret;
}

S
Shengliang Guan 已提交
128
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
129
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
130
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
131 132
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
133 134 135
  }
  ASSERT(rid == pSyncNode->rid);

M
Minghao Li 已提交
136
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
137
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
138
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
139
    sError("invalid new config. vgId:%d", pSyncNode->vgId);
M
Minghao Li 已提交
140
    return -1;
M
Minghao Li 已提交
141
  }
142

S
Shengliang Guan 已提交
143
#if 0
M
Minghao Li 已提交
144
  char*   newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
M
Minghao Li 已提交
145 146
  int32_t ret = 0;

M
Minghao Li 已提交
147
  SRpcMsg rpcMsg = {0};
148
  rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
S
Shengliang Guan 已提交
149
  rpcMsg.info.noResp = 1;
150
  rpcMsg.contLen = strlen(newconfig) + 1;
M
Minghao Li 已提交
151
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
152 153
  snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
  taosMemoryFree(newconfig);
M
Minghao Li 已提交
154 155
  ret = syncNodePropose(pSyncNode, &rpcMsg, false);

S
Shengliang Guan 已提交
156
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
157
  return ret;
S
Shengliang Guan 已提交
158 159 160
#else
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
M
Minghao Li 已提交
161 162 163 164 165 166 167 168 169 170 171
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
      syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
    }

    syncNodeStartHeartbeatTimer(pSyncNode);

    syncNodeReplicate(pSyncNode);
  }
S
Shengliang Guan 已提交
172
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
173 174
  return 0;
#endif
M
Minghao Li 已提交
175
}
M
Minghao Li 已提交
176

177
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
178
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
179
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
180 181
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
M
Minghao Li 已提交
182 183 184
  }
  ASSERT(rid == pSyncNode->rid);

M
Minghao Li 已提交
185
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
186
  syncNodeRelease(pSyncNode);
187 188 189 190
  return ret;
}

int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
S
Shengliang Guan 已提交
191
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
192
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
193 194
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
195
  }
M
Minghao Li 已提交
196
  ASSERT(rid == pSyncNode->rid);
197

M
Minghao Li 已提交
198
  int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
S
Shengliang Guan 已提交
199
  syncNodeRelease(pSyncNode);
200 201 202
  return ret;
}

M
Minghao Li 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

219 220 221 222 223 224 225 226 227 228 229 230 231 232
char* syncNodePeerState2Str(const SSyncNode* pSyncNode) {
  int32_t len = 128;
  int32_t useLen = 0;
  int32_t leftLen = len - useLen;
  char*   pStr = taosMemoryMalloc(len);
  memset(pStr, 0, len);

  char*   p = pStr;
  int32_t use = snprintf(p, leftLen, "{");
  useLen += use;
  leftLen -= use;

  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    SPeerState* pState = syncNodeGetPeerState((SSyncNode*)pSyncNode, &(pSyncNode->replicasId[i]));
M
Minghao Li 已提交
233
    if (pState == NULL) {
234
      sError("vgId:%d, replica maybe dropped", pSyncNode->vgId);
M
Minghao Li 已提交
235 236
      break;
    }
237 238

    p = pStr + useLen;
S
Shengliang Guan 已提交
239
    use = snprintf(p, leftLen, "%d:%" PRId64 " ,%" PRId64, i, pState->lastSendIndex, pState->lastSendTime);
240 241 242 243 244 245 246 247 248 249 250 251 252 253
    useLen += use;
    leftLen -= use;
  }

  p = pStr + useLen;
  use = snprintf(p, leftLen, "}");
  useLen += use;
  leftLen -= use;

  // sTrace("vgId:%d, ------------------ syncNodePeerState2Str:%s", pSyncNode->vgId, pStr);

  return pStr;
}

254
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
255
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
256 257 258 259 260 261 262
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);
  int32_t code = 0;

M
Minghao Li 已提交
263
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
264 265 266
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
267 268 269
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
270 271 272
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
M
Minghao Li 已提交
273
      char logBuf[256];
S
Shengliang Guan 已提交
274 275 276
      snprintf(logBuf, sizeof(logBuf),
               "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex,
               logNum, isEmpty);
M
Minghao Li 已提交
277 278
      syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
279
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
280 281 282
      return 0;
    }

M
Minghao Li 已提交
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
    goto _DEL_WAL;

  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
              char logBuf[256];
              snprintf(logBuf, sizeof(logBuf),
S
Shengliang Guan 已提交
302 303
                       "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                       " of %s:%d, do not delete wal",
M
Minghao Li 已提交
304 305 306 307
                       lastApplyIndex, matchIndex, host, port);
              syncNodeEventLog(pSyncNode, logBuf);
            } while (0);

S
Shengliang Guan 已提交
308
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
309 310 311 312 313 314 315 316
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
          char logBuf[256];
          snprintf(logBuf, sizeof(logBuf),
S
Shengliang Guan 已提交
317 318
                   "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                   lastApplyIndex, pSyncNode->minMatchIndex);
M
Minghao Li 已提交
319 320
          syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
321
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
322 323 324 325
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
326
        char logBuf[256];
S
Shengliang Guan 已提交
327
        snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
328 329
        syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
330
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
331 332 333 334
        return 0;

      } else {
        char logBuf[256];
S
Shengliang Guan 已提交
335 336
        snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " unknown state, do not delete wal",
                 lastApplyIndex);
M
Minghao Li 已提交
337 338
        syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
339
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
340 341 342 343 344 345 346 347 348
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
349 350 351
    }
  }

M
Minghao Li 已提交
352
_DEL_WAL:
353

M
Minghao Li 已提交
354
  do {
355 356 357 358
    SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

    if (snapshottingIndex == SYNC_INDEX_INVALID) {
      atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
359
      pSyncNode->snapshottingTime = taosGetTimestampMs();
360

M
Minghao Li 已提交
361 362 363
      SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
      code = walBeginSnapshot(pData->pWal, lastApplyIndex);
      if (code == 0) {
364
        char logBuf[256];
S
Shengliang Guan 已提交
365
        snprintf(logBuf, sizeof(logBuf), "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
366 367 368
                 pSyncNode->snapshottingIndex, lastApplyIndex);
        syncNodeEventLog(pSyncNode, logBuf);

M
Minghao Li 已提交
369 370
      } else {
        char logBuf[256];
S
Shengliang Guan 已提交
371 372 373
        snprintf(logBuf, sizeof(logBuf),
                 "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64, terrstr(terrno),
                 pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
374 375 376 377
        syncNodeErrorLog(pSyncNode, logBuf);

        atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
      }
378 379

    } else {
380
      char logBuf[256];
S
Shengliang Guan 已提交
381 382 383
      snprintf(logBuf, sizeof(logBuf),
               "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64, snapshottingIndex,
               lastApplyIndex);
384
      syncNodeEventLog(pSyncNode, logBuf);
385
    }
M
Minghao Li 已提交
386
  } while (0);
387

S
Shengliang Guan 已提交
388
  syncNodeRelease(pSyncNode);
389 390 391 392
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
393
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
394 395 396 397 398 399
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

400 401 402 403
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
404
    if (code != 0) {
M
Minghao Li 已提交
405
      sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr(terrno));
M
Minghao Li 已提交
406

S
Shengliang Guan 已提交
407
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
408 409 410 411
      return -1;
    } else {
      do {
        char logBuf[256];
S
Shengliang Guan 已提交
412 413
        snprintf(logBuf, sizeof(logBuf), "wal snapshot end, index:%" PRId64,
                 atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
414 415
        syncNodeEventLog(pSyncNode, logBuf);
      } while (0);
416

M
Minghao Li 已提交
417 418
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
419
  }
420

S
Shengliang Guan 已提交
421
  syncNodeRelease(pSyncNode);
422 423 424
  return code;
}

M
Minghao Li 已提交
425
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
426
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
427 428 429 430 431 432 433 434
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  syncNodeStepDown(pSyncNode, newTerm);

S
Shengliang Guan 已提交
435
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
436 437 438
  return 0;
}

M
Minghao Li 已提交
439 440
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
441
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
442 443 444 445 446 447 448 449 450 451 452 453 454
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

  SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
  int32_t   ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  return ret;
}

int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  int32_t ret = 0;

  if (pSyncNode->replicaNum == 1) {
455
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
456 457 458 459
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

M
Minghao Li 已提交
460 461 462 463 464 465
  do {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

M
Minghao Li 已提交
466 467 468 469 470 471 472 473 474 475 476 477 478
  SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;
  ASSERT(pMsg != NULL);
  SRpcMsg rpcMsg = {0};
  syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
  syncLeaderTransferDestroy(pMsg);

  ret = syncNodePropose(pSyncNode, &rpcMsg, false);
  return ret;
}

479
bool syncCanLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
480
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
481 482 483
  if (pSyncNode == NULL) {
    return false;
  }
M
Minghao Li 已提交
484
  ASSERT(rid == pSyncNode->rid);
485 486

  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
487
    syncNodeRelease(pSyncNode);
488 489 490 491
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
492
    syncNodeRelease(pSyncNode);
493 494 495 496 497 498 499 500 501 502 503 504 505 506
    return true;
  }

  bool matchOK = true;
  if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE || pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SyncIndex myCommitIndex = pSyncNode->commitIndex;
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
      if (peerMatchIndex < myCommitIndex) {
        matchOK = false;
      }
    }
  }

S
Shengliang Guan 已提交
507
  syncNodeRelease(pSyncNode);
508 509 510
  return matchOK;
}

511
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
512 513 514
  int32_t ret = syncPropose(rid, pMsg, isWeak);
  return ret;
}
M
Minghao Li 已提交
515

M
Minghao Li 已提交
516
ESyncState syncGetMyRole(int64_t rid) {
S
Shengliang Guan 已提交
517
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
518 519 520
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
M
Minghao Li 已提交
521
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
522 523
  ESyncState state = pSyncNode->state;

S
Shengliang Guan 已提交
524
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
525
  return state;
M
Minghao Li 已提交
526 527
}

M
Minghao Li 已提交
528
bool syncIsReady(int64_t rid) {
S
Shengliang Guan 已提交
529
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
530 531 532
  if (pSyncNode == NULL) {
    return false;
  }
M
Minghao Li 已提交
533
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
534
  bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish;
S
Shengliang Guan 已提交
535
  syncNodeRelease(pSyncNode);
536 537 538 539 540 541 542 543 544

  // if false, set error code
  if (false == b) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
      terrno = TSDB_CODE_SYN_NOT_LEADER;
    } else {
      terrno = TSDB_CODE_APP_NOT_READY;
    }
  }
M
Minghao Li 已提交
545 546 547
  return b;
}

M
Minghao Li 已提交
548
bool syncIsRestoreFinish(int64_t rid) {
S
Shengliang Guan 已提交
549
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
550 551 552
  if (pSyncNode == NULL) {
    return false;
  }
M
Minghao Li 已提交
553
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
554 555
  bool b = pSyncNode->restoreFinish;

S
Shengliang Guan 已提交
556
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
557 558 559
  return b;
}

560 561 562 563 564
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
565
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
566 567 568 569 570 571 572 573 574 575 576
  if (pSyncNode == NULL) {
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
    }
S
Shengliang Guan 已提交
577
    syncNodeRelease(pSyncNode);
578 579 580 581 582 583 584 585 586 587
    return -1;
  }
  ASSERT(pEntry != NULL);

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

  syncEntryDestory(pEntry);
S
Shengliang Guan 已提交
588
  syncNodeRelease(pSyncNode);
589 590 591
  return 0;
}

592
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
593
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
594 595 596
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
597
  ASSERT(rid == pSyncNode->rid);
598 599
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
600
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
601

S
Shengliang Guan 已提交
602
  syncNodeRelease(pSyncNode);
603 604 605
  return 0;
}

606
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
607
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
608 609 610
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
611
  ASSERT(rid == pSyncNode->rid);
612 613 614 615 616 617 618 619 620 621 622

  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

  for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
623
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
624
         sMeta->lastConfigIndex);
625

S
Shengliang Guan 已提交
626
  syncNodeRelease(pSyncNode);
627 628 629
  return 0;
}

630 631 632 633 634 635 636 637 638 639
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

  for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
640
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
641
         snapshotLastApplyIndex, lastIndex);
642 643 644 645

  return lastIndex;
}

M
Minghao Li 已提交
646 647 648 649 650
const char* syncGetMyRoleStr(int64_t rid) {
  const char* s = syncUtilState2String(syncGetMyRole(rid));
  return s;
}

M
Minghao Li 已提交
651
bool syncRestoreFinish(int64_t rid) {
S
Shengliang Guan 已提交
652
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
653 654 655 656 657 658
  if (pSyncNode == NULL) {
    return false;
  }
  ASSERT(rid == pSyncNode->rid);
  bool restoreFinish = pSyncNode->restoreFinish;

S
Shengliang Guan 已提交
659
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
660 661 662
  return restoreFinish;
}

M
Minghao Li 已提交
663
SyncTerm syncGetMyTerm(int64_t rid) {
S
Shengliang Guan 已提交
664
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
665 666 667
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
M
Minghao Li 已提交
668
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
669
  SyncTerm term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
670

S
Shengliang Guan 已提交
671
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
672
  return term;
M
Minghao Li 已提交
673 674
}

675
SyncIndex syncGetLastIndex(int64_t rid) {
S
Shengliang Guan 已提交
676
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
677 678 679 680 681 682
  if (pSyncNode == NULL) {
    return SYNC_INDEX_INVALID;
  }
  ASSERT(rid == pSyncNode->rid);
  SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);

S
Shengliang Guan 已提交
683
  syncNodeRelease(pSyncNode);
684 685 686 687
  return lastIndex;
}

SyncIndex syncGetCommitIndex(int64_t rid) {
S
Shengliang Guan 已提交
688
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
689 690 691 692 693 694
  if (pSyncNode == NULL) {
    return SYNC_INDEX_INVALID;
  }
  ASSERT(rid == pSyncNode->rid);
  SyncIndex cmtIndex = pSyncNode->commitIndex;

S
Shengliang Guan 已提交
695
  syncNodeRelease(pSyncNode);
696 697 698
  return cmtIndex;
}

M
Minghao Li 已提交
699
SyncGroupId syncGetVgId(int64_t rid) {
S
Shengliang Guan 已提交
700
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
701
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
702 703
    return TAOS_SYNC_STATE_ERROR;
  }
M
Minghao Li 已提交
704
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
705
  SyncGroupId vgId = pSyncNode->vgId;
M
Minghao Li 已提交
706

S
Shengliang Guan 已提交
707
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
708
  return vgId;
M
Minghao Li 已提交
709 710
}

M
Minghao Li 已提交
711
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
712
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
713 714 715 716
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
M
Minghao Li 已提交
717
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
718 719
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
720 721
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
M
Minghao Li 已提交
722
    (pEpSet->numOfEps)++;
S
Shengliang Guan 已提交
723
    sInfo("vgId:%d, sync get epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
M
Minghao Li 已提交
724 725
  }
  pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
S
Shengliang Guan 已提交
726
  sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
727

S
Shengliang Guan 已提交
728
  syncNodeRelease(pSyncNode);
729
}
M
Minghao Li 已提交
730

731
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
732
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
733 734 735 736 737 738 739 740 741 742
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
  ASSERT(rid == pSyncNode->rid);
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    (pEpSet->numOfEps)++;
M
Minghao Li 已提交
743 744
    sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn,
          pEpSet->eps[i].port);
745
  }
M
Minghao Li 已提交
746 747 748
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
  }
M
Minghao Li 已提交
749
  sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
M
Minghao Li 已提交
750

S
Shengliang Guan 已提交
751
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
752 753
}

M
Minghao Li 已提交
754
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
S
Shengliang Guan 已提交
755
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
756 757 758
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
M
Minghao Li 已提交
759
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
760 761 762 763 764 765 766

  SRespStub stub;
  int32_t   ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
    memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
  }

S
Shengliang Guan 已提交
767
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
768 769 770
  return ret;
}

S
Shengliang Guan 已提交
771
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) {
S
Shengliang Guan 已提交
772
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
773 774 775
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
M
Minghao Li 已提交
776
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
777 778 779 780

  SRespStub stub;
  int32_t   ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
S
Shengliang Guan 已提交
781
    *pInfo = stub.rpcMsg.info;
M
Minghao Li 已提交
782 783
  }

S
Shengliang Guan 已提交
784
  sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
S
Shengliang Guan 已提交
785
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
786 787 788
  return ret;
}

789
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
S
Shengliang Guan 已提交
790
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
791
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
792
    sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid);
M
Minghao Li 已提交
793 794
    return;
  }
M
Minghao Li 已提交
795
  ASSERT(rid == pSyncNode->rid);
S
Shengliang Guan 已提交
796
  pSyncNode->msgcb = msgcb;
M
Minghao Li 已提交
797

S
Shengliang Guan 已提交
798
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
799 800 801
}

char* sync2SimpleStr(int64_t rid) {
S
Shengliang Guan 已提交
802
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
803
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
804
    sTrace("syncSetRpc get pSyncNode is NULL, rid:%" PRId64, rid);
M
Minghao Li 已提交
805 806
    return NULL;
  }
M
Minghao Li 已提交
807
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
808
  char* s = syncNode2SimpleStr(pSyncNode);
S
Shengliang Guan 已提交
809
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
810 811 812 813 814

  return s;
}

void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
S
Shengliang Guan 已提交
815
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
816 817 818
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
819
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
820 821 822
  pSyncNode->pingBaseLine = pingTimerMS;
  pSyncNode->pingTimerMS = pingTimerMS;

S
Shengliang Guan 已提交
823
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
824 825 826
}

void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
S
Shengliang Guan 已提交
827
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
828 829 830
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
831
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
832 833
  pSyncNode->electBaseLine = electTimerMS;

S
Shengliang Guan 已提交
834
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
835 836 837
}

void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
S
Shengliang Guan 已提交
838
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
839 840 841
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
842
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
843 844 845
  pSyncNode->hbBaseLine = hbTimerMS;
  pSyncNode->heartbeatTimerMS = hbTimerMS;

S
Shengliang Guan 已提交
846
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
847 848
}

M
Minghao Li 已提交
849
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
850
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
851
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
852
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
853 854
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
855
  }
M
Minghao Li 已提交
856
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
857

858
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
S
Shengliang Guan 已提交
859
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
860 861 862
  return ret;
}

863
static bool syncNodeBatchOK(SRpcMsg** pMsgPArr, int32_t arrSize) {
M
Minghao Li 已提交
864
  for (int32_t i = 0; i < arrSize; ++i) {
865
    if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE) {
M
Minghao Li 已提交
866 867 868
      return false;
    }

869
    if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
M
Minghao Li 已提交
870 871 872 873 874 875 876
      return false;
    }
  }

  return true;
}

877
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
878
  int32_t ret = 0;
M
Minghao Li 已提交
879

M
Minghao Li 已提交
880 881
  do {
    char eventLog[128];
S
Shengliang Guan 已提交
882
    snprintf(eventLog, sizeof(eventLog), "propose message, type:%s", TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
883 884
    syncNodeEventLog(pSyncNode, eventLog);
  } while (0);
M
Minghao Li 已提交
885

M
Minghao Li 已提交
886
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
887 888 889
    if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
      ret = -1;
      terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
S
Shengliang Guan 已提交
890
      sError("vgId:%d, failed to sync propose since not ready, type:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
891 892 893 894 895 896 897 898
      goto _END;
    }

    // config change
    if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
      if (!syncNodeCanChange(pSyncNode)) {
        ret = -1;
        terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY;
S
Shengliang Guan 已提交
899
        sError("vgId:%d, failed to sync reconfig since not ready, type:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
900 901 902 903 904 905 906
        goto _END;
      }

      ASSERT(!pSyncNode->changing);
      pSyncNode->changing = true;
    }

907 908
    // not restored, vnode enable
    if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
909 910
      ret = -1;
      terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
S
Shengliang Guan 已提交
911 912
      sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
             pSyncNode->vgId, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
913 914 915
      goto _END;
    }

M
Minghao Li 已提交
916 917 918 919 920 921
    SRespStub stub;
    stub.createTime = taosGetTimestampMs();
    stub.rpcMsg = *pMsg;
    uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);

    SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
M
Minghao Li 已提交
922 923
    SRpcMsg            rpcMsg;
    syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
924

925 926 927
    // optimized one replica
    if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
      SyncIndex retIndex;
M
Minghao Li 已提交
928
      int32_t   code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex);
929 930
      if (code == 0) {
        pMsg->info.conn.applyIndex = retIndex;
M
Minghao Li 已提交
931
        pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
932 933 934
        rpcFreeCont(rpcMsg.pCont);
        syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
        ret = 1;
935 936
        sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
               TMSG_INFO(pMsg->msgType));
937 938 939
      } else {
        ret = -1;
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
S
Shengliang Guan 已提交
940 941
        sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
               TMSG_INFO(pMsg->msgType));
942 943
      }

M
Minghao Li 已提交
944
    } else {
945 946 947 948 949
      if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
        ret = 0;
      } else {
        ret = -1;
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
S
Shengliang Guan 已提交
950
        sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId);
951
      }
M
Minghao Li 已提交
952
    }
953

M
Minghao Li 已提交
954
    syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
955 956
    goto _END;

M
Minghao Li 已提交
957
  } else {
M
Minghao Li 已提交
958 959
    ret = -1;
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
960 961
    sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
           TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
962
    goto _END;
M
Minghao Li 已提交
963
  }
M
Minghao Li 已提交
964

M
Minghao Li 已提交
965
_END:
M
Minghao Li 已提交
966 967 968
  return ret;
}

969 970 971 972 973 974 975 976 977 978 979 980
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
981
  if (syncIsInit()) {
M
Minghao Li 已提交
982
    SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
983 984 985 986
    pData->pSyncNode = pSyncNode;
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
M
Minghao Li 已提交
987

988
    pSyncTimer->pData = pData;
S
Shengliang Guan 已提交
989
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
990 991 992 993 994 995 996 997 998 999 1000
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
M
Minghao Li 已提交
1001
  // taosMemoryFree(pSyncTimer->pData);
1002 1003 1004
  return ret;
}

S
Shengliang Guan 已提交
1005 1006
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
1007 1008 1009 1010
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
1011

M
Minghao Li 已提交
1012 1013 1014 1015
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
1016
      goto _error;
M
Minghao Li 已提交
1017
    }
1018
  }
M
Minghao Li 已提交
1019

S
Shengliang Guan 已提交
1020
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
1021
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
1022
    // create a new raft config file
S
Shengliang Guan 已提交
1023
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
1024
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
1025
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
1026
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
1027
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
1028 1029
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
1030
      goto _error;
1031
    }
1032
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
1033
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
1034 1035
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
1036 1037 1038
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
1039
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
1040
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
1041
      goto _error;
1042
    }
S
Shengliang Guan 已提交
1043 1044

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
1045 1046 1047 1048 1049 1050
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
1051 1052 1053 1054
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
1055 1056

    raftCfgClose(pSyncNode->pRaftCfg);
1057
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1058 1059
  }

S
Shengliang Guan 已提交
1060 1061
  // init by SSyncInfo
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
1062 1063 1064 1065 1066 1067 1068
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
1069
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
1070 1071 1072
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
1073

M
Minghao Li 已提交
1074
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
1075
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
1076
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
1077
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
1078
  pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg;
M
Minghao Li 已提交
1079

M
Minghao Li 已提交
1080 1081
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
1082
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
1083
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
1084 1085
    goto _error;
  }
M
Minghao Li 已提交
1086

M
Minghao Li 已提交
1087
  // init internal
M
Minghao Li 已提交
1088
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1089
  if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
1090
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
1091
    goto _error;
1092
  }
M
Minghao Li 已提交
1093

M
Minghao Li 已提交
1094
  // init peersNum, peers, peersId
M
Minghao Li 已提交
1095
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
M
Minghao Li 已提交
1096
  int j = 0;
M
Minghao Li 已提交
1097 1098 1099
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
1100 1101 1102
      j++;
    }
  }
M
Minghao Li 已提交
1103
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
1104
    if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
1105
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
1106
      goto _error;
1107
    }
M
Minghao Li 已提交
1108
  }
M
Minghao Li 已提交
1109

M
Minghao Li 已提交
1110
  // init replicaNum, replicasId
M
Minghao Li 已提交
1111 1112
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
H
Hongze Cheng 已提交
1113
    if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
1114
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
1115
      goto _error;
1116
    }
M
Minghao Li 已提交
1117 1118
  }

M
Minghao Li 已提交
1119
  // init raft algorithm
M
Minghao Li 已提交
1120
  pSyncNode->pFsm = pSyncInfo->pFsm;
1121
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
1122
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
1123 1124
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
1125
  // init life cycle outside
M
Minghao Li 已提交
1126

M
Minghao Li 已提交
1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
1151
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
1152
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
1153
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
1154
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
1155
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
1156 1157
    goto _error;
  }
M
Minghao Li 已提交
1158

M
Minghao Li 已提交
1159
  // init TLA+ candidate vars
M
Minghao Li 已提交
1160
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
1161
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
1162
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
1163 1164
    goto _error;
  }
M
Minghao Li 已提交
1165
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
1166
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
1167
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
1168 1169
    goto _error;
  }
M
Minghao Li 已提交
1170

M
Minghao Li 已提交
1171 1172
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
1173
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
1174
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1175 1176
    goto _error;
  }
M
Minghao Li 已提交
1177
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
1178
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
1179
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1180 1181
    goto _error;
  }
M
Minghao Li 已提交
1182 1183 1184

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
1185
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
1186
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
1187 1188
    goto _error;
  }
1189 1190 1191 1192 1193

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1194
    if (code != 0) {
S
Shengliang Guan 已提交
1195
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
1196
      goto _error;
1197
    }
1198 1199 1200 1201 1202 1203
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
      syncNodeEventLog(pSyncNode, "reset commit index by snapshot");
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
1204

M
Minghao Li 已提交
1205 1206 1207 1208 1209
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
1210
  // init ping timer
M
Minghao Li 已提交
1211
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
1212
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
1213 1214
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
1215
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
1216
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
1217

M
Minghao Li 已提交
1218 1219
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
1220
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1221
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
1222
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
1223 1224 1225 1226
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
1227
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
1228 1229
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
1230
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
1231 1232
  pSyncNode->heartbeatTimerCounter = 0;

1233 1234 1235 1236 1237
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
1238
  // init callback
M
Minghao Li 已提交
1239 1240
  pSyncNode->FpOnPing = syncNodeOnPing;
  pSyncNode->FpOnPingReply = syncNodeOnPingReply;
M
Minghao Li 已提交
1241
  pSyncNode->FpOnClientRequest = syncNodeOnClientRequest;
1242
  pSyncNode->FpOnTimeout = syncNodeOnTimer;
M
Minghao Li 已提交
1243 1244 1245 1246 1247 1248
  pSyncNode->FpOnSnapshot = syncNodeOnSnapshot;
  pSyncNode->FpOnSnapshotReply = syncNodeOnSnapshotReply;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVote;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReply;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntries;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReply;
M
Minghao Li 已提交
1249

M
Minghao Li 已提交
1250
  // tools
M
Minghao Li 已提交
1251
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
1252
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
1253
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
1254 1255
    goto _error;
  }
M
Minghao Li 已提交
1256

1257 1258
  // restore state
  pSyncNode->restoreFinish = false;
1259

M
Minghao Li 已提交
1260 1261 1262 1263 1264 1265 1266 1267
  // snapshot senders
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
  }

  // snapshot receivers
1268
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
1269

M
Minghao Li 已提交
1270 1271 1272
  // is config changing
  pSyncNode->changing = false;

M
Minghao Li 已提交
1273 1274 1275
  // peer state
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1276 1277 1278
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1279
  // start in syncNodeStart
M
Minghao Li 已提交
1280
  // start raft
M
Minghao Li 已提交
1281
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1282

M
Minghao Li 已提交
1283 1284
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1285
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1286 1287
  pSyncNode->lastReplicateTime = timeNow;

1288 1289 1290
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

M
Minghao Li 已提交
1291
  syncNodeEventLog(pSyncNode, "sync open");
1292

M
Minghao Li 已提交
1293
  return pSyncNode;
1294 1295 1296

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1297 1298
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1299 1300 1301 1302
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1303 1304
}

M
Minghao Li 已提交
1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
    ASSERT(code == 0);
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

M
Minghao Li 已提交
1316 1317
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
1318
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
1319
    raftStoreNextTerm(pSyncNode->pRaftStore);
1320
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
1321

1322
    // Raft 3.6.2 Committing entries from previous terms
1323 1324
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
1325

M
Minghao Li 已提交
1326 1327
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
1328 1329
  }

1330 1331 1332
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
1333 1334
}

M
Minghao Li 已提交
1335 1336 1337 1338 1339 1340 1341 1342 1343
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
1344

1345 1346 1347
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
1348 1349
}

M
Minghao Li 已提交
1350
void syncNodeClose(SSyncNode* pSyncNode) {
1351 1352 1353
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
1354 1355
  int32_t ret;

M
Minghao Li 已提交
1356 1357
  syncNodeEventLog(pSyncNode, "sync close");

M
Minghao Li 已提交
1358
  ret = raftStoreClose(pSyncNode->pRaftStore);
M
Minghao Li 已提交
1359
  ASSERT(ret == 0);
M
Minghao Li 已提交
1360

M
Minghao Li 已提交
1361
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1362
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1363
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1364
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1365
  votesRespondDestory(pSyncNode->pVotesRespond);
1366
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1367
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1368
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1369
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1370
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1371
  logStoreDestory(pSyncNode->pLogStore);
1372
  pSyncNode->pLogStore = NULL;
M
Minghao Li 已提交
1373
  raftCfgClose(pSyncNode->pRaftCfg);
1374
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1375 1376 1377 1378 1379

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1380 1381 1382 1383
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1384 1385 1386 1387 1388 1389 1390
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    if ((pSyncNode->senders)[i] != NULL) {
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1391 1392 1393 1394 1395
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1396
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1397 1398
}

M
Minghao Li 已提交
1399
// option
M
Minghao Li 已提交
1400 1401
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }

M
Minghao Li 已提交
1402
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1403

M
Minghao Li 已提交
1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
  syncPingLog2((char*)"==syncNodePing==", pMsg);
  int32_t ret = 0;

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);

  ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
}

int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
M
Minghao Li 已提交
1419
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId, pSyncNode->vgId);
M
Minghao Li 已提交
1420
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
1421
  ASSERT(ret == 0);
M
Minghao Li 已提交
1422 1423 1424 1425 1426 1427 1428 1429

  syncPingDestroy(pMsg);
  return ret;
}

int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
1430 1431 1432
    SRaftId*  destId = &(pSyncNode->peersId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
1433
    ASSERT(ret == 0);
M
Minghao Li 已提交
1434 1435 1436 1437 1438 1439 1440
    syncPingDestroy(pMsg);
  }
  return ret;
}

int32_t syncNodePingAll(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1441 1442 1443 1444
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    SRaftId*  destId = &(pSyncNode->replicasId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
1445
    ASSERT(ret == 0);
M
Minghao Li 已提交
1446 1447 1448 1449 1450 1451 1452 1453
    syncPingDestroy(pMsg);
  }
  return ret;
}

// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1454 1455
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1456 1457 1458
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1459
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1460
  }
M
Minghao Li 已提交
1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1474
  if (syncIsInit()) {
1475
    pSyncNode->electTimerMS = ms;
M
Minghao Li 已提交
1476 1477 1478 1479 1480 1481

    SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
    pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
    pElectTimer->pSyncNode = pSyncNode;
    pElectTimer->pData = NULL;

S
Shengliang Guan 已提交
1482
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager,
1483
                 &pSyncNode->pElectTimer);
1484

1485
  } else {
M
Minghao Li 已提交
1486
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1487
  }
M
Minghao Li 已提交
1488 1489 1490 1491 1492
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1493
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1494 1495
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1496

M
Minghao Li 已提交
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1507 1508
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1509 1510 1511 1512 1513 1514 1515
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1516
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1517 1518 1519 1520 1521 1522 1523 1524

  do {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine,
             2 * pSyncNode->electBaseLine, electMS);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

M
Minghao Li 已提交
1525 1526 1527
  return ret;
}

M
Minghao Li 已提交
1528
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1529
  int32_t ret = 0;
S
Shengliang Guan 已提交
1530 1531
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1532 1533 1534
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1535
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1536
  }
1537 1538 1539 1540 1541 1542 1543

  do {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

M
Minghao Li 已提交
1544 1545 1546
  return ret;
}

M
Minghao Li 已提交
1547
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1548
  int32_t ret = 0;
M
Minghao Li 已提交
1549

1550
#if 0
M
Minghao Li 已提交
1551
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1552 1553
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1554

1555 1556
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1557 1558 1559
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1560
  }
1561

M
Minghao Li 已提交
1562 1563 1564
  return ret;
}

M
Minghao Li 已提交
1565 1566
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1567 1568

#if 0
M
Minghao Li 已提交
1569 1570 1571
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1572
#endif
1573

1574 1575
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1576 1577 1578
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1579
  }
1580

M
Minghao Li 已提交
1581 1582 1583
  return ret;
}

1584 1585 1586 1587 1588 1589
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1590 1591 1592 1593
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
M
Minghao Li 已提交
1594 1595 1596 1597
  if (pSyncNode->FpSendMsg != NULL) {
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1598
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1599
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
1600
  } else {
M
Minghao Li 已提交
1601 1602
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
    return -1;
M
Minghao Li 已提交
1603
  }
M
Minghao Li 已提交
1604

M
Minghao Li 已提交
1605 1606 1607 1608 1609 1610
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
M
Minghao Li 已提交
1611 1612 1613 1614
  if (pSyncNode->FpSendMsg != NULL) {
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1615
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1616
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
1617
  } else {
M
Minghao Li 已提交
1618
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1619
  }
M
Minghao Li 已提交
1620 1621 1622
  return 0;
}

M
Minghao Li 已提交
1623
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
C
Cary Xu 已提交
1624
  char   u64buf[128] = {0};
M
Minghao Li 已提交
1625 1626
  cJSON* pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
1627 1628 1629
  if (pSyncNode != NULL) {
    // init by SSyncInfo
    cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
M
Minghao Li 已提交
1630
    cJSON_AddItemToObject(pRoot, "SRaftCfg", raftCfg2Json(pSyncNode->pRaftCfg));
M
Minghao Li 已提交
1631
    cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
M
Minghao Li 已提交
1632 1633 1634
    cJSON_AddStringToObject(pRoot, "raftStorePath", pSyncNode->raftStorePath);
    cJSON_AddStringToObject(pRoot, "configPath", pSyncNode->configPath);

M
Minghao Li 已提交
1635 1636 1637
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);

S
Shengliang Guan 已提交
1638
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
1639 1640 1641 1642
    cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
    cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);

S
Shengliang Guan 已提交
1643
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664
    cJSON_AddStringToObject(pRoot, "queue", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
    cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);

    // init internal
    cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
    cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
    cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
    cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);

    cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
    cJSON* pPeers = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
    }
    cJSON* pPeersId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
    }
M
Minghao Li 已提交
1665

M
Minghao Li 已提交
1666 1667 1668 1669 1670 1671
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
    cJSON* pReplicasId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
    }
M
Minghao Li 已提交
1672

M
Minghao Li 已提交
1673 1674 1675 1676 1677 1678 1679
    // raft algorithm
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
    cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
    cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
    cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);

M
Minghao Li 已提交
1680
    // life cycle
S
Shengliang Guan 已提交
1681
    snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->rid);
M
Minghao Li 已提交
1682 1683
    cJSON_AddStringToObject(pRoot, "rid", u64buf);

M
Minghao Li 已提交
1684 1685 1686
    // tla+ server vars
    cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
    cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
1687
    cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698

    // tla+ candidate vars
    cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
    cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));

    // tla+ leader vars
    cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
    cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));

    // tla+ log vars
    cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
S
Shengliang Guan 已提交
1699
    snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->commitIndex);
M
Minghao Li 已提交
1700 1701
    cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);

M
Minghao Li 已提交
1702 1703 1704 1705 1706
    // timer ms init
    cJSON_AddNumberToObject(pRoot, "pingBaseLine", pSyncNode->pingBaseLine);
    cJSON_AddNumberToObject(pRoot, "electBaseLine", pSyncNode->electBaseLine);
    cJSON_AddNumberToObject(pRoot, "hbBaseLine", pSyncNode->hbBaseLine);

M
Minghao Li 已提交
1707 1708 1709 1710
    // ping timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
    cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
S
Shengliang Guan 已提交
1711
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerLogicClock);
M
Minghao Li 已提交
1712
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
S
Shengliang Guan 已提交
1713
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1714 1715 1716
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
    cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
S
Shengliang Guan 已提交
1717
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerCounter);
M
Minghao Li 已提交
1718 1719 1720 1721 1722 1723
    cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);

    // elect timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
    cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
S
Shengliang Guan 已提交
1724
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClock);
M
Minghao Li 已提交
1725 1726 1727
    cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
    cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
S
Shengliang Guan 已提交
1728
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerCounter);
M
Minghao Li 已提交
1729 1730 1731 1732 1733 1734
    cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);

    // heartbeat timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
    cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
S
Shengliang Guan 已提交
1735
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerLogicClock);
M
Minghao Li 已提交
1736
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
S
Shengliang Guan 已提交
1737
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerLogicClockUser);
M
Minghao Li 已提交
1738 1739 1740
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
    cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
S
Shengliang Guan 已提交
1741
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerCounter);
M
Minghao Li 已提交
1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758
    cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);

    // callback
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
    cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
    cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
    cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
M
Minghao Li 已提交
1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771

    // restoreFinish
    cJSON_AddNumberToObject(pRoot, "restoreFinish", pSyncNode->restoreFinish);

    // snapshot senders
    cJSON* pSenders = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "senders", pSenders);
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      cJSON_AddItemToArray(pSenders, snapshotSender2Json((pSyncNode->senders)[i]));
    }

    // snapshot receivers
    cJSON* pReceivers = cJSON_CreateArray();
1772
    cJSON_AddItemToObject(pRoot, "receiver", snapshotReceiver2Json(pSyncNode->pNewNodeReceiver));
M
Minghao Li 已提交
1773 1774 1775

    // changing
    cJSON_AddNumberToObject(pRoot, "changing", pSyncNode->changing);
M
Minghao Li 已提交
1776 1777 1778 1779 1780 1781 1782
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
  return pJson;
}

M
Minghao Li 已提交
1783 1784 1785 1786 1787 1788 1789
char* syncNode2Str(const SSyncNode* pSyncNode) {
  cJSON* pJson = syncNode2Json(pSyncNode);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

1790
inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
M
Minghao Li 已提交
1791 1792 1793 1794
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1795
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
M
Minghao Li 已提交
1796
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
M
Minghao Li 已提交
1797 1798
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
  }
M
Minghao Li 已提交
1799 1800 1801 1802 1803 1804 1805

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pLogStore != NULL) {
    logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  }
M
Minghao Li 已提交
1806

M
Minghao Li 已提交
1807
  char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
1808 1809 1810 1811
  char* printStr = "";
  if (pCfgStr != NULL) {
    printStr = pCfgStr;
  }
M
Minghao Li 已提交
1812

1813 1814 1815
  char*   peerStateStr = syncNodePeerState2Str(pSyncNode);
  int32_t userStrLen = strlen(str) + strlen(peerStateStr);

M
Minghao Li 已提交
1816
  if (userStrLen < 256) {
M
Minghao Li 已提交
1817
    char logBuf[256 + 256];
1818 1819
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
1820 1821
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1822 1823 1824
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
1825
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
1826
               pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1827 1828 1829 1830
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1831
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
1832 1833 1834
    } else {
      snprintf(logBuf, sizeof(logBuf), "%s", str);
    }
1835
    // sDebug("%s", logBuf);
M
Minghao Li 已提交
1836 1837
    // sInfo("%s", logBuf);
    sTrace("%s", logBuf);
M
Minghao Li 已提交
1838

M
Minghao Li 已提交
1839
  } else {
M
Minghao Li 已提交
1840
    int   len = 256 + userStrLen;
M
Minghao Li 已提交
1841
    char* s = (char*)taosMemoryMalloc(len);
1842 1843
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(s, len,
M
Minghao Li 已提交
1844 1845
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1846 1847 1848
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
1849
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
1850
               pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1851 1852 1853 1854
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1855
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
1856 1857 1858
    } else {
      snprintf(s, len, "%s", str);
    }
1859
    // sDebug("%s", s);
M
Minghao Li 已提交
1860 1861
    // sInfo("%s", s);
    sTrace("%s", s);
M
Minghao Li 已提交
1862 1863
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
1864

M
Minghao Li 已提交
1865
  taosMemoryFree(peerStateStr);
M
Minghao Li 已提交
1866
  taosMemoryFree(pCfgStr);
M
Minghao Li 已提交
1867 1868
}

1869
inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
M
Minghao Li 已提交
1870 1871 1872 1873
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1874 1875 1876
  int32_t userStrLen = strlen(str);

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
M
Minghao Li 已提交
1877
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
M
Minghao Li 已提交
1878 1879
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
  }
M
Minghao Li 已提交
1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pLogStore != NULL) {
    logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  }

  char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
  char* printStr = "";
  if (pCfgStr != NULL) {
    printStr = pCfgStr;
  }
M
Minghao Li 已提交
1893 1894

  if (userStrLen < 256) {
M
Minghao Li 已提交
1895
    char logBuf[256 + 256];
1896 1897
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
1898 1899
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1900 1901 1902
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
M
Minghao Li 已提交
1903
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
1904
               pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1905 1906 1907 1908
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1909
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
1910 1911 1912
    } else {
      snprintf(logBuf, sizeof(logBuf), "%s", str);
    }
M
Minghao Li 已提交
1913 1914 1915
    sError("%s", logBuf);

  } else {
M
Minghao Li 已提交
1916
    int   len = 256 + userStrLen;
M
Minghao Li 已提交
1917
    char* s = (char*)taosMemoryMalloc(len);
1918 1919
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(s, len,
M
Minghao Li 已提交
1920 1921
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1922 1923 1924
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
M
Minghao Li 已提交
1925
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
1926
               pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1927 1928 1929 1930
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1931
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
1932 1933 1934
    } else {
      snprintf(s, len, "%s", str);
    }
M
Minghao Li 已提交
1935 1936 1937
    sError("%s", s);
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
1938 1939

  taosMemoryFree(pCfgStr);
M
Minghao Li 已提交
1940 1941
}

1942
inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1943 1944
  int   len = 256;
  char* s = (char*)taosMemoryMalloc(len);
M
Minghao Li 已提交
1945 1946 1947 1948 1949 1950 1951 1952

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);

M
Minghao Li 已提交
1953
  snprintf(s, len,
M
Minghao Li 已提交
1954 1955 1956 1957
           "vgId:%d, sync %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
           ", sby:%d, "
           "r-num:%d, "
           "lcfg:%" PRId64 ", chging:%d, rsto:%d",
M
Minghao Li 已提交
1958 1959 1960 1961
           pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
           pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy,
           pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish);

M
Minghao Li 已提交
1962 1963 1964
  return s;
}

1965
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991
  bool b1 = false;
  bool b2 = false;

  for (int i = 0; i < config->replicaNum; ++i) {
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

  for (int i = 0; i < config->replicaNum; ++i) {
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

  ASSERT(b1 == b2);
  return b1;
}

1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
2005
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
2006
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
2007 2008 2009 2010
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
2011

2012
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
2013 2014
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

M
Minghao Li 已提交
2015 2016
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
2017

M
Minghao Li 已提交
2018 2019
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
2020

M
Minghao Li 已提交
2021 2022 2023 2024
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
2025
  }
2026

M
Minghao Li 已提交
2027 2028 2029 2030 2031
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
2032

M
Minghao Li 已提交
2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043
  // log begin config change
  do {
    char  eventLog[256];
    char* pOldCfgStr = syncCfg2SimpleStr(&oldConfig);
    char* pNewCfgStr = syncCfg2SimpleStr(pNewConfig);
    snprintf(eventLog, sizeof(eventLog), "begin do config change, from %s to %s", pOldCfgStr, pNewCfgStr);
    syncNodeEventLog(pSyncNode, eventLog);
    taosMemoryFree(pOldCfgStr);
    taosMemoryFree(pNewCfgStr);
  } while (0);

M
Minghao Li 已提交
2044 2045
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
2046
  }
M
Minghao Li 已提交
2047 2048
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
2049 2050
  }

M
Minghao Li 已提交
2051
  // add last config index
M
Minghao Li 已提交
2052
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
2053

M
Minghao Li 已提交
2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      oldSenders[i] = (pSyncNode->senders)[i];
M
Minghao Li 已提交
2065

M
Minghao Li 已提交
2066 2067 2068 2069
      char* eventLog = snapshotSender2SimpleStr(oldSenders[i], "snapshot sender save old");
      syncNodeEventLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    }
2070

M
Minghao Li 已提交
2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
    syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
    int j = 0;
    for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
    }
2087

M
Minghao Li 已提交
2088 2089 2090 2091 2092
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
    for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
      syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
    }
2093

2094 2095 2096
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
2097 2098 2099 2100
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
2101

M
Minghao Li 已提交
2102
    // reset snapshot senders
2103

M
Minghao Li 已提交
2104 2105 2106 2107
    // clear new
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
2108

M
Minghao Li 已提交
2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120
    // reset new
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      // reset sender
      bool reset = false;
      for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);

          do {
            char eventLog[256];
S
Shengliang Guan 已提交
2121
            snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for: %" PRIu64 ", newIndex:%d, %s:%d, %p",
M
Minghao Li 已提交
2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141
                     (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
            syncNodeEventLog(pSyncNode, eventLog);
          } while (0);

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

          do {
            char eventLog[256];
            snprintf(eventLog, sizeof(eventLog),
                     "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex, i, host,
                     port, (pSyncNode->senders)[i], reset);
            syncNodeEventLog(pSyncNode, eventLog);
          } while (0);
        }
2142 2143
      }
    }
2144

M
Minghao Li 已提交
2145 2146 2147 2148
    // create new
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
M
Minghao Li 已提交
2149

M
Minghao Li 已提交
2150 2151 2152 2153
        char* eventLog = snapshotSender2SimpleStr((pSyncNode->senders)[i], "snapshot sender create new");
        syncNodeEventLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      }
2154 2155
    }

M
Minghao Li 已提交
2156 2157 2158 2159
    // free old
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      if (oldSenders[i] != NULL) {
        snapshotSenderDestroy(oldSenders[i]);
M
Minghao Li 已提交
2160

M
Minghao Li 已提交
2161 2162 2163 2164 2165
        do {
          char eventLog[128];
          snprintf(eventLog, sizeof(eventLog), "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
          syncNodeEventLog(pSyncNode, eventLog);
        } while (0);
M
Minghao Li 已提交
2166

M
Minghao Li 已提交
2167 2168
        oldSenders[i] = NULL;
      }
2169 2170
    }

2171
    // persist cfg
M
Minghao Li 已提交
2172
    raftCfgPersist(pSyncNode->pRaftCfg);
2173

M
Minghao Li 已提交
2174 2175 2176
    char  tmpbuf[512];
    char* oldStr = syncCfg2SimpleStr(&oldConfig);
    char* newStr = syncCfg2SimpleStr(pNewConfig);
2177 2178
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
M
Minghao Li 已提交
2179 2180
    taosMemoryFree(oldStr);
    taosMemoryFree(newStr);
M
Minghao Li 已提交
2181

M
Minghao Li 已提交
2182 2183 2184
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
2185 2186 2187 2188 2189

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
      syncMaybeAdvanceCommitIndex(pSyncNode);

M
Minghao Li 已提交
2190 2191 2192 2193
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
2194
    // persist cfg
M
Minghao Li 已提交
2195
    raftCfgPersist(pSyncNode->pRaftCfg);
2196

M
Minghao Li 已提交
2197 2198 2199
    char  tmpbuf[512];
    char* oldStr = syncCfg2SimpleStr(&oldConfig);
    char* newStr = syncCfg2SimpleStr(pNewConfig);
2200 2201
    snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
M
Minghao Li 已提交
2202 2203 2204
    taosMemoryFree(oldStr);
    taosMemoryFree(newStr);
    syncNodeEventLog(pSyncNode, tmpbuf);
2205
  }
2206

M
Minghao Li 已提交
2207
_END:
M
Minghao Li 已提交
2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218

  // log end config change
  do {
    char  eventLog[256];
    char* pOldCfgStr = syncCfg2SimpleStr(&oldConfig);
    char* pNewCfgStr = syncCfg2SimpleStr(pNewConfig);
    snprintf(eventLog, sizeof(eventLog), "end do config change, from %s to %s", pOldCfgStr, pNewCfgStr);
    syncNodeEventLog(pSyncNode, eventLog);
    taosMemoryFree(pOldCfgStr);
    taosMemoryFree(pNewCfgStr);
  } while (0);
M
Minghao Li 已提交
2219
  return;
M
Minghao Li 已提交
2220 2221
}

M
Minghao Li 已提交
2222 2223 2224 2225
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
2226
    char tmpBuf[64];
S
Shengliang Guan 已提交
2227
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRIu64, term);
2228
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
2229 2230 2231 2232
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

2233 2234 2235 2236 2237 2238
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
2239 2240 2241 2242 2243
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
  ASSERT(pSyncNode->pRaftStore->currentTerm <= newTerm);

  do {
    char logBuf[128];
S
Shengliang Guan 已提交
2244
    snprintf(logBuf, sizeof(logBuf), "step down, new-term:%" PRIu64 ", current-term:%" PRIu64, newTerm,
M
Minghao Li 已提交
2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262
             pSyncNode->pRaftStore->currentTerm);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRIu64, newTerm);
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

2263 2264
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

2265
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
2266
  // maybe clear leader cache
M
Minghao Li 已提交
2267 2268 2269 2270
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
2271
  // state change
M
Minghao Li 已提交
2272 2273 2274
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
2275 2276
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
2277

2278 2279 2280
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

2281 2282 2283 2284 2285
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
2286 2287 2288
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302
  // trace log
  do {
    int32_t debugStrLen = strlen(debugStr);
    if (debugStrLen < 256) {
      char eventLog[256 + 64];
      snprintf(eventLog, sizeof(eventLog), "become follower %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
    } else {
      char* eventLog = taosMemoryMalloc(debugStrLen + 64);
      snprintf(eventLog, debugStrLen, "become follower %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    }
  } while (0);
M
Minghao Li 已提交
2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
2323
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
2324 2325
  pSyncNode->leaderTime = taosGetTimestampMs();

2326 2327 2328
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
2329
  // state change
M
Minghao Li 已提交
2330
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
2331 2332

  // set leader cache
M
Minghao Li 已提交
2333 2334 2335
  pSyncNode->leaderCache = pSyncNode->myRaftId;

  for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
2336 2337
    // maybe overwrite myself, no harm
    // just do it!
2338 2339 2340 2341 2342 2343 2344 2345 2346

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
2347 2348 2349
  }

  for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
2350 2351
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
2352 2353 2354
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
2355 2356 2357
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

2358 2359
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2360 2361 2362 2363 2364
  if (pMySender != NULL) {
    for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
2365
    }
2366
    (pMySender->privateTerm) += 100;
2367 2368
  }

2369 2370 2371 2372 2373
  // close receiver
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
2374
  // stop elect timer
M
Minghao Li 已提交
2375
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
2376

M
Minghao Li 已提交
2377 2378
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
2379

M
Minghao Li 已提交
2380 2381
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
2382

2383 2384 2385 2386 2387
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
2388 2389 2390
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404
  // trace log
  do {
    int32_t debugStrLen = strlen(debugStr);
    if (debugStrLen < 256) {
      char eventLog[256 + 64];
      snprintf(eventLog, sizeof(eventLog), "become leader %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
    } else {
      char* eventLog = taosMemoryMalloc(debugStrLen + 64);
      snprintf(eventLog, debugStrLen, "become leader %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    }
  } while (0);
M
Minghao Li 已提交
2405 2406 2407
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2408 2409
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
2410
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
2411

M
Minghao Li 已提交
2412 2413
  syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);

M
Minghao Li 已提交
2414
  // Raft 3.6.2 Committing entries from previous terms
2415 2416
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
2417 2418

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2419
    syncNodeReplicate(pSyncNode);
2420
  }
M
Minghao Li 已提交
2421 2422
}

M
Minghao Li 已提交
2423 2424
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
2425 2426 2427 2428 2429 2430 2431
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
2432 2433 2434
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2435
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
2436
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
M
Minghao Li 已提交
2437

M
Minghao Li 已提交
2438
  syncNodeEventLog(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
2439 2440 2441
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2442
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
2443
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
M
Minghao Li 已提交
2444

M
Minghao Li 已提交
2445
  syncNodeEventLog(pSyncNode, "leader to follower");
M
Minghao Li 已提交
2446 2447 2448
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2449
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
2450
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
2451

M
Minghao Li 已提交
2452
  syncNodeEventLog(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
2453 2454 2455
}

// raft vote --------------
M
Minghao Li 已提交
2456 2457 2458

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
2459
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
M
Minghao Li 已提交
2460 2461
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
2462 2463 2464 2465

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
2466
// simulate get vote from outside
M
Minghao Li 已提交
2467 2468 2469
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

M
Minghao Li 已提交
2470
  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId);
M
Minghao Li 已提交
2471 2472 2473 2474 2475 2476 2477 2478 2479 2480
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}

M
Minghao Li 已提交
2481
// snapshot --------------
M
Minghao Li 已提交
2482 2483

// return if has a snapshot
M
Minghao Li 已提交
2484 2485
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
2486
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2487 2488
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2489 2490 2491 2492 2493 2494 2495
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
2496 2497
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
2498
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2499
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2500 2501
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2502 2503 2504 2505 2506 2507 2508
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
2509 2510
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
2511 2512
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
2513 2514
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
2515
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2516 2517
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2518 2519
    }

M
Minghao Li 已提交
2520 2521 2522
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
2523 2524 2525 2526
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
2527
  } else {
M
Minghao Li 已提交
2528 2529
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
2530
  }
M
Minghao Li 已提交
2531

M
Minghao Li 已提交
2532 2533 2534 2535 2536 2537 2538
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
2539 2540
  return 0;
}
M
Minghao Li 已提交
2541

M
Minghao Li 已提交
2542
// return append-entries first try index
M
Minghao Li 已提交
2543 2544 2545 2546 2547
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
2548 2549
// if index > 0, return index - 1
// else, return -1
2550 2551 2552 2553 2554 2555 2556 2557 2558
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
2559 2560 2561 2562
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

  SyncTerm        preTerm = 0;
  SyncIndex       preIndex = index - 1;
  SSyncRaftEntry* pPreEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
M
Minghao Li 已提交
2576 2577 2578 2579 2580 2581

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2582 2583 2584 2585 2586 2587
  if (code == 0) {
    ASSERT(pPreEntry != NULL);
    preTerm = pPreEntry->term;
    taosMemoryFree(pPreEntry);
    return preTerm;
  } else {
2588 2589 2590 2591
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2592 2593 2594 2595
      }
    }
  }

2596 2597
  do {
    char logBuf[128];
S
Shengliang Guan 已提交
2598 2599
    snprintf(logBuf, sizeof(logBuf),
             "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRIu64, index,
M
Minghao Li 已提交
2600
             snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2601 2602 2603
    syncNodeErrorLog(pSyncNode, logBuf);
  } while (0);

2604 2605
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2606 2607 2608 2609

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2610
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2611 2612 2613
  return 0;
}

M
Minghao Li 已提交
2614 2615 2616
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2617
  printf("syncNodePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
M
Minghao Li 已提交
2618
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
2619
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2620 2621 2622 2623
}

void syncNodePrint2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2624
  printf("syncNodePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
M
Minghao Li 已提交
2625
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
2626
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2627 2628 2629 2630
}

void syncNodeLog(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2631
  sTraceLong("syncNodeLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
2632
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2633 2634 2635
}

void syncNodeLog2(char* s, SSyncNode* pObj) {
2636 2637
  if (gRaftDetailLog) {
    char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2638
    sTraceLong("syncNodeLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
2639 2640
    taosMemoryFree(serialized);
  }
M
Minghao Li 已提交
2641 2642
}

M
Minghao Li 已提交
2643 2644
void syncNodeLog3(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2645
  sTraceLong("syncNodeLog3 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
M
Minghao Li 已提交
2646 2647 2648
  taosMemoryFree(serialized);
}

M
Minghao Li 已提交
2649
// ------ local funciton ---------
M
Minghao Li 已提交
2650
// enqueue message ----
M
Minghao Li 已提交
2651 2652
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
2653
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
2654
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
M
Minghao Li 已提交
2655
                                              pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
2656 2657
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
2658
    syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
M
Minghao Li 已提交
2659
    if (pSyncNode->FpEqMsg != NULL) {
2660 2661
      int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
      if (code != 0) {
S
Shengliang Guan 已提交
2662
        sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code);
2663 2664 2665 2666
        rpcFreeCont(rpcMsg.pCont);
        syncTimeoutDestroy(pSyncMsg);
        return;
      }
M
Minghao Li 已提交
2667 2668 2669
    } else {
      sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
2670 2671
    syncTimeoutDestroy(pSyncMsg);

S
Shengliang Guan 已提交
2672 2673
    if (syncIsInit()) {
      taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
2674 2675 2676 2677 2678
                   &pSyncNode->pPingTimer);
    } else {
      sError("sync env is stop, syncNodeEqPingTimer");
    }

M
Minghao Li 已提交
2679
  } else {
S
Shengliang Guan 已提交
2680
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64,
M
Minghao Li 已提交
2681
           pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
2682 2683 2684 2685
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
M
Minghao Li 已提交
2686 2687
  SElectTimer* pElectTimer = (SElectTimer*)param;
  SSyncNode*   pSyncNode = pElectTimer->pSyncNode;
M
Minghao Li 已提交
2688

M
Minghao Li 已提交
2689 2690
  SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pSyncNode->electTimerMS,
                                            pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
2691 2692 2693 2694 2695 2696 2697 2698
  SRpcMsg      rpcMsg;
  syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
  if (pSyncNode->FpEqMsg != NULL) {
    int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
      rpcFreeCont(rpcMsg.pCont);
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
2699
      taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
2700
      return;
2701
    }
M
Minghao Li 已提交
2702 2703 2704

    do {
      char logBuf[128];
M
Minghao Li 已提交
2705
      snprintf(logBuf, sizeof(logBuf), "eq elect timer lc:%" PRIu64, pSyncMsg->logicClock);
M
Minghao Li 已提交
2706
      syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
2707 2708
    } while (0);

M
Minghao Li 已提交
2709
  } else {
M
Minghao Li 已提交
2710 2711
    sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
  }
M
Minghao Li 已提交
2712

M
Minghao Li 已提交
2713
  syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
2714
  taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
2715

M
Minghao Li 已提交
2716
#if 0
M
Minghao Li 已提交
2717
  // reset timer ms
S
Shengliang Guan 已提交
2718
  if (syncIsInit() && pSyncNode->electBaseLine > 0) {
M
Minghao Li 已提交
2719
    pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
S
Shengliang Guan 已提交
2720
    taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager,
M
Minghao Li 已提交
2721 2722 2723
                 &pSyncNode->pElectTimer);
  } else {
    sError("sync env is stop, syncNodeEqElectTimer");
M
Minghao Li 已提交
2724
  }
M
Minghao Li 已提交
2725
#endif
M
Minghao Li 已提交
2726 2727
}

M
Minghao Li 已提交
2728 2729
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
2730 2731 2732

  syncNodeEventLog(pSyncNode, "eq hb timer");

2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744
  if (pSyncNode->replicaNum > 1) {
    if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
        atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
      SyncTimeout* pSyncMsg =
          syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
                            pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
      SRpcMsg rpcMsg;
      syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
      syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
      if (pSyncNode->FpEqMsg != NULL) {
        int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
        if (code != 0) {
S
Shengliang Guan 已提交
2745
          sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
2746 2747 2748 2749 2750
          rpcFreeCont(rpcMsg.pCont);
          syncTimeoutDestroy(pSyncMsg);
          return;
        }
      } else {
2751
        sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId);
2752
      }
2753
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
2754

S
Shengliang Guan 已提交
2755 2756
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
2757 2758 2759 2760
                     &pSyncNode->pHeartbeatTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }
2761
    } else {
2762 2763 2764
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
             "",
             pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
2765
    }
M
Minghao Li 已提交
2766 2767 2768
  }
}

2769 2770 2771 2772 2773
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
  SSyncHbTimerData* pData = (SSyncHbTimerData*)param;
  SSyncNode*        pSyncNode = pData->pSyncNode;
  SSyncTimer*       pSyncTimer = pData->pTimer;

M
Minghao Li 已提交
2774 2775 2776 2777
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return;
  }

S
Shengliang Guan 已提交
2778
  // syncNodeEventLog(pSyncNode, "eq peer hb timer");
2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789

  int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
  int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

  if (pSyncNode->replicaNum > 1) {
    if (timerLogicClock == msgLogicClock) {
      SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
      pSyncMsg->srcId = pSyncNode->myRaftId;
      pSyncMsg->destId = pData->destId;
      pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
      pSyncMsg->commitIndex = pSyncNode->commitIndex;
M
Minghao Li 已提交
2790
      pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811
      pSyncMsg->privateTerm = 0;

      SRpcMsg rpcMsg;
      syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);

// eq msg
#if 0
      if (pSyncNode->FpEqCtrlMsg != NULL) {
        int32_t code = pSyncNode->FpEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
        if (code != 0) {
          sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
          rpcFreeCont(rpcMsg.pCont);
          syncHeartbeatDestroy(pSyncMsg);
          return;
        }
      } else {
        sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId);
      }
#endif

      // send msg
M
Minghao Li 已提交
2812
      syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
2813 2814 2815

      syncHeartbeatDestroy(pSyncMsg);

S
Shengliang Guan 已提交
2816 2817
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829
                     &pSyncTimer->pTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }

    } else {
      sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRIu64 ", msgLogicClock:%" PRIu64 "", timerLogicClock,
             msgLogicClock);
    }
  }
}

M
Minghao Li 已提交
2830 2831
static int32_t syncNodeEqNoop(SSyncNode* ths) {
  int32_t ret = 0;
M
Minghao Li 已提交
2832
  ASSERT(ths->state == TAOS_SYNC_STATE_LEADER);
M
Minghao Li 已提交
2833

2834
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2835
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2836
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2837
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2838 2839 2840 2841

  uint32_t           entryLen;
  char*              serialized = syncEntrySerialize(pEntry, &entryLen);
  SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen);
M
Minghao Li 已提交
2842
  ASSERT(pSyncMsg->dataLen == entryLen);
M
Minghao Li 已提交
2843 2844
  memcpy(pSyncMsg->data, serialized, entryLen);

S
Shengliang Guan 已提交
2845
  SRpcMsg rpcMsg = {0};
M
Minghao Li 已提交
2846
  syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
2847
  if (ths->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
2848
    ths->FpEqMsg(ths->msgcb, &rpcMsg);
M
Minghao Li 已提交
2849 2850 2851
  } else {
    sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
  }
M
Minghao Li 已提交
2852

M
Minghao Li 已提交
2853
  syncEntryDestory(pEntry);
wafwerar's avatar
wafwerar 已提交
2854
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2855 2856 2857 2858 2859
  syncClientRequestDestroy(pSyncMsg);

  return ret;
}

2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  int       code = 0;
  int       entryLen = sizeof(*pEntry) + pEntry->dataLen;
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

M
Minghao Li 已提交
2874 2875 2876
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

2877
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2878
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2879
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2880
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2881

2882 2883 2884
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2885
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2886
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2887
    if (code != 0) {
M
Minghao Li 已提交
2888
      syncNodeErrorLog(ths, "append noop error");
2889 2890
      return -1;
    }
M
Minghao Li 已提交
2891 2892
  }

2893 2894 2895 2896 2897 2898
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2899 2900 2901
  return ret;
}

M
Minghao Li 已提交
2902
// on message ----
M
Minghao Li 已提交
2903 2904 2905
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg) {
  sTrace("vgId:%d, recv sync-ping", ths->vgId);

M
Minghao Li 已提交
2906
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
M
Minghao Li 已提交
2907 2908
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
M
Minghao Li 已提交
2909 2910

  /*
M
Minghao Li 已提交
2911 2912 2913 2914 2915
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */
M
Minghao Li 已提交
2916

M
Minghao Li 已提交
2917
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
M
Minghao Li 已提交
2918
  syncPingReplyDestroy(pMsgReply);
M
Minghao Li 已提交
2919

M
Minghao Li 已提交
2920
  return 0;
M
Minghao Li 已提交
2921 2922
}

M
Minghao Li 已提交
2923
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
2924
  int32_t ret = 0;
M
Minghao Li 已提交
2925
  sTrace("vgId:%d, recv sync-ping-reply", ths->vgId);
M
Minghao Li 已提交
2926 2927
  return ret;
}
M
Minghao Li 已提交
2928

2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
  syncLogRecvHeartbeat(ths, pMsg, "");

  SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number

  SRpcMsg rpcMsg;
  syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);

M
Minghao Li 已提交
2941
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2942
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2943
    ths->minMatchIndex = pMsg->minMatchIndex;
2944 2945 2946 2947 2948 2949 2950 2951

#if 0
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeFollowerCommit(ths, pMsg->commitIndex);
    }
#endif
  }

M
Minghao Li 已提交
2952
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971
    // syncNodeStepDown(ths, pMsg->term);
    SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

    SRpcMsg rpcMsgLocalCmd;
    syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

    if (ths->FpEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd);
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRIu64, ths->vgId, pSyncMsg->sdNewTerm);
      }
    }

    syncLocalCmdDestroy(pSyncMsg);
M
Minghao Li 已提交
2972 2973
  }

2974 2975 2976 2977 2978 2979 2980 2981 2982
  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
M
Minghao Li 已提交
2983
  syncHeartbeatReplyDestroy(pMsgReply);
2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996

  return 0;
}

int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
  syncLogRecvHeartbeatReply(ths, pMsg, "");

  // update last reply time, make decision whether the other node is alive or not
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime);

  return 0;
}

M
Minghao Li 已提交
2997
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
2998 2999
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
3000 3001 3002 3003 3004 3005 3006 3007 3008 3009
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

  } else {
    syncNodeErrorLog(ths, "error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
3010 3011 3012 3013 3014 3015 3016 3017 3018 3019
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
3020

M
Minghao Li 已提交
3021
int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
3022 3023
  syncNodeEventLog(ths, "on client request");

M
Minghao Li 已提交
3024
  int32_t ret = 0;
3025
  int32_t code = 0;
M
Minghao Li 已提交
3026

M
Minghao Li 已提交
3027
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
3028 3029
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
M
Minghao Li 已提交
3030
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
3031

3032 3033 3034
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
3035
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
3036 3037 3038 3039
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
      // del resp mgr, call FpCommitCb
M
Minghao Li 已提交
3040
      ASSERT(0);
3041 3042
      return -1;
    }
M
Minghao Li 已提交
3043

3044 3045
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
3046
      syncNodeReplicate(ths);
3047
    }
3048

3049 3050 3051 3052
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
      syncMaybeAdvanceCommitIndex(ths);
    }
M
Minghao Li 已提交
3053 3054
  }

3055 3056 3057 3058 3059 3060 3061 3062
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

3063 3064 3065 3066 3067 3068
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
3069
  return ret;
3070
}
M
Minghao Li 已提交
3071

S
Shengliang Guan 已提交
3072 3073 3074
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
3075
      return "follower";
S
Shengliang Guan 已提交
3076
    case TAOS_SYNC_STATE_CANDIDATE:
3077
      return "candidate";
S
Shengliang Guan 已提交
3078
    case TAOS_SYNC_STATE_LEADER:
3079
      return "leader";
S
Shengliang Guan 已提交
3080
    default:
3081
      return "error";
S
Shengliang Guan 已提交
3082
  }
M
Minghao Li 已提交
3083
}
3084

3085
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
3086 3087 3088 3089
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
    syncNodeEventLog(ths, "I am not follower, can not do leader transfer");
    return 0;
  }
3090 3091 3092 3093 3094 3095

  if (!ths->restoreFinish) {
    syncNodeEventLog(ths, "restore not finish, can not do leader transfer");
    return 0;
  }

3096 3097
  if (pEntry->term < ths->pRaftStore->currentTerm) {
    char logBuf[128];
S
Shengliang Guan 已提交
3098
    snprintf(logBuf, sizeof(logBuf), "little term:%" PRIu64 ", can not do leader transfer", pEntry->term);
3099 3100 3101 3102 3103 3104
    syncNodeEventLog(ths, logBuf);
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
    char logBuf[128];
S
Shengliang Guan 已提交
3105
    snprintf(logBuf, sizeof(logBuf), "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
3106
    syncNodeEventLog(ths, logBuf);
3107 3108 3109
    return 0;
  }

3110 3111 3112 3113 3114 3115 3116
  /*
    if (ths->vgId > 1) {
      syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
      return 0;
    }
  */

M
Minghao Li 已提交
3117 3118
  SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);

3119 3120
  do {
    char logBuf[128];
S
Shengliang Guan 已提交
3121
    snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%" PRId64, pEntry->index);
3122 3123
    syncNodeEventLog(ths, logBuf);
  } while (0);
M
Minghao Li 已提交
3124

M
Minghao Li 已提交
3125 3126 3127
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
3128

M
Minghao Li 已提交
3129 3130
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
3131 3132 3133 3134
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
M
Minghao Li 已提交
3135 3136

    char eventLog[256];
S
Shengliang Guan 已提交
3137
    snprintf(eventLog, sizeof(eventLog), "maybe leader transfer to %s:%d %" PRIu64,
M
Minghao Li 已提交
3138 3139 3140
             pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
             pSyncLeaderTransfer->newLeaderId.addr);
    syncNodeEventLog(ths, eventLog);
3141 3142
  }

M
Minghao Li 已提交
3143
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
3144
    SFsmCbMeta cbMeta = {0};
M
Minghao Li 已提交
3145 3146 3147 3148
    cbMeta.code = 0;
    cbMeta.currentTerm = ths->pRaftStore->currentTerm;
    cbMeta.flag = 0;
    cbMeta.index = pEntry->index;
3149
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
M
Minghao Li 已提交
3150 3151 3152 3153 3154
    cbMeta.isWeak = pEntry->isWeak;
    cbMeta.seqNum = pEntry->seqNum;
    cbMeta.state = ths->state;
    cbMeta.term = pEntry->term;
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta);
3155 3156
  }

M
Minghao Li 已提交
3157
  syncLeaderTransferDestroy(pSyncLeaderTransfer);
3158 3159 3160
  return 0;
}

3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
  for (int i = 0; i < pNewCfg->replicaNum; ++i) {
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

M
Minghao Li 已提交
3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197
static int32_t syncNodeConfigChangeFinish(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
  SyncReconfigFinish* pFinish = syncReconfigFinishFromRpcMsg2(pRpcMsg);
  ASSERT(pFinish);

  if (ths->pFsm->FpReConfigCb != NULL) {
    SReConfigCbMeta cbMeta = {0};
    cbMeta.code = 0;
    cbMeta.index = pEntry->index;
    cbMeta.term = pEntry->term;
    cbMeta.seqNum = pEntry->seqNum;
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index);
    cbMeta.state = ths->state;
    cbMeta.currentTerm = ths->pRaftStore->currentTerm;
    cbMeta.isWeak = pEntry->isWeak;
    cbMeta.flag = 0;

    cbMeta.oldCfg = pFinish->oldCfg;
    cbMeta.newCfg = pFinish->newCfg;
    cbMeta.newCfgIndex = pFinish->newCfgIndex;
    cbMeta.newCfgTerm = pFinish->newCfgTerm;
    cbMeta.newCfgSeqNum = pFinish->newCfgSeqNum;

S
Shengliang Guan 已提交
3198
    ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, &cbMeta);
M
Minghao Li 已提交
3199 3200
  }

3201
  // clear changing
M
Minghao Li 已提交
3202 3203 3204 3205 3206
  ths->changing = false;

  char  tmpbuf[512];
  char* oldStr = syncCfg2SimpleStr(&(pFinish->oldCfg));
  char* newStr = syncCfg2SimpleStr(&(pFinish->newCfg));
S
Shengliang Guan 已提交
3207
  snprintf(tmpbuf, sizeof(tmpbuf), "config change finish from %d to %d, index:%" PRId64 ", %s  -->  %s",
M
Minghao Li 已提交
3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219
           pFinish->oldCfg.replicaNum, pFinish->newCfg.replicaNum, pFinish->newCfgIndex, oldStr, newStr);
  taosMemoryFree(oldStr);
  taosMemoryFree(newStr);
  syncNodeEventLog(ths, tmpbuf);

  syncReconfigFinishDestroy(pFinish);

  return 0;
}

static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry,
                                    SyncReconfigFinish* pFinish) {
3220 3221 3222
  // set changing
  ths->changing = true;

M
Minghao Li 已提交
3223
  // old config
3224 3225
  SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;

M
Minghao Li 已提交
3226
  // new config
3227 3228 3229 3230 3231
  SSyncCfg newSyncCfg;
  int32_t  ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
  ASSERT(ret == 0);

  // update new config myIndex
3232 3233
  syncNodeUpdateNewConfigIndex(ths, &newSyncCfg);

M
Minghao Li 已提交
3234 3235
  // do config change
  syncNodeDoConfigChange(ths, &newSyncCfg, pEntry->index);
3236

M
Minghao Li 已提交
3237 3238 3239 3240 3241 3242
  // set pFinish
  pFinish->oldCfg = oldSyncCfg;
  pFinish->newCfg = newSyncCfg;
  pFinish->newCfgIndex = pEntry->index;
  pFinish->newCfgTerm = pEntry->term;
  pFinish->newCfgSeqNum = pEntry->seqNum;
3243

M
Minghao Li 已提交
3244 3245
  return 0;
}
3246

M
Minghao Li 已提交
3247 3248 3249 3250 3251 3252 3253 3254 3255
static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFinish* pFinish) {
  SRpcMsg rpcMsg;
  syncReconfigFinish2RpcMsg(pFinish, &rpcMsg);

  int32_t code = syncNodePropose(ths, &rpcMsg, false);
  if (code != 0) {
    sError("syncNodeProposeConfigChangeFinish error");
    ths->changing = false;
  }
3256 3257 3258
  return 0;
}

3259 3260 3261 3262
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
3263
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
3264 3265 3266 3267
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
      char eventLog[128];
      snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex,
               snapshot.lastApplyIndex);
      syncNodeEventLog(ths, eventLog);
3281

M
Minghao Li 已提交
3282 3283 3284
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
3285 3286
  }

3287 3288
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
3289 3290

  char eventLog[128];
3291
  snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
M
Minghao Li 已提交
3292
  syncNodeEventLog(ths, eventLog);
3293 3294 3295 3296 3297 3298

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
3299 3300 3301 3302 3303 3304
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        } else {
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
M
Minghao Li 已提交
3305 3306 3307
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
          if (code != 0 || pEntry == NULL) {
M
Minghao Li 已提交
3308
            syncNodeErrorLog(ths, "get log entry error");
M
Minghao Li 已提交
3309 3310
            continue;
          }
3311
        }
3312 3313 3314 3315

        SRpcMsg rpcMsg;
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

3316
        // user commit
3317 3318
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
3319
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
3320 3321 3322 3323 3324
            internalExecute = false;
          }

          do {
            char logBuf[128];
S
Shengliang Guan 已提交
3325
            snprintf(logBuf, sizeof(logBuf), "commit index:%" PRId64 ", internal:%d", i, internalExecute);
3326 3327
            syncNodeEventLog(ths, logBuf);
          } while (0);
3328

3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
            SFsmCbMeta cbMeta = {0};
            cbMeta.index = pEntry->index;
            cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
            cbMeta.isWeak = pEntry->isWeak;
            cbMeta.code = 0;
            cbMeta.state = ths->state;
            cbMeta.seqNum = pEntry->seqNum;
            cbMeta.term = pEntry->term;
            cbMeta.currentTerm = ths->pRaftStore->currentTerm;
            cbMeta.flag = flag;

            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
          }
3344 3345 3346
        }

        // config change
3347
        if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
M
Minghao Li 已提交
3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361
          SyncReconfigFinish* pFinish = syncReconfigFinishBuild(ths->vgId);
          ASSERT(pFinish != NULL);

          code = syncNodeConfigChange(ths, &rpcMsg, pEntry, pFinish);
          ASSERT(code == 0);

          if (ths->state == TAOS_SYNC_STATE_LEADER) {
            syncNodeProposeConfigChangeFinish(ths, pFinish);
          }
          syncReconfigFinishDestroy(pFinish);
        }

        // config change finish
        if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
M
Minghao Li 已提交
3362
          if (rpcMsg.pCont != NULL && rpcMsg.contLen > 0) {
M
Minghao Li 已提交
3363 3364 3365
            code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry);
            ASSERT(code == 0);
          }
3366
        }
3367

3368 3369
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
3370
        // leader transfer
3371 3372 3373
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
3374
        }
3375
#endif
3376 3377

        // restore finish
3378
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
3379 3380 3381 3382 3383 3384
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
3385

3386 3387
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;

M
Minghao Li 已提交
3388
            char eventLog[128];
S
Shengliang Guan 已提交
3389 3390
            snprintf(eventLog, sizeof(eventLog), "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms, ",
                     pEntry->index, restoreDelay);
M
Minghao Li 已提交
3391
            syncNodeEventLog(ths, eventLog);
3392 3393 3394 3395
          }
        }

        rpcFreeCont(rpcMsg.pCont);
3396 3397 3398 3399 3400
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
3401 3402 3403 3404
      }
    }
  }
  return 0;
3405 3406 3407 3408 3409 3410 3411 3412 3413
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
3414 3415 3416 3417 3418 3419 3420 3421 3422 3423
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
3424
}
M
Minghao Li 已提交
3425

3426 3427 3428 3429 3430 3431 3432 3433 3434 3435
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
3448
  if (pState == NULL) {
3449
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
3450 3451
    return false;
  }
M
Minghao Li 已提交
3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
3479
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
3480 3481 3482 3483 3484 3485
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
3486 3487
}

3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
  if (timerType == SYNC_TIMEOUT_PING) {
    return "ping";
  } else if (timerType == SYNC_TIMEOUT_ELECTION) {
    return "elect";
  } else if (timerType == SYNC_TIMEOUT_HEARTBEAT) {
    return "heartbeat";
  } else {
    return "unknown";
  }
}

void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
  char logBuf[256];
S
Shengliang Guan 已提交
3502
  snprintf(logBuf, sizeof(logBuf), "recv sync-timer {type:%s, lc:%" PRIu64 ", ms:%d, data:%p}, %s",
M
Minghao Li 已提交
3503
           syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
3504 3505 3506
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3507
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
M
Minghao Li 已提交
3508 3509 3510 3511 3512 3513 3514 3515 3516 3517
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host, port,
           pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3518
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
M
Minghao Li 已提交
3519 3520 3521 3522 3523 3524 3525 3526 3527 3528
  char     logBuf[256];
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  snprintf(logBuf, sizeof(logBuf),
           "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host,
           port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3529
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
M
Minghao Li 已提交
3530 3531 3532 3533 3534 3535 3536 3537 3538
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "send sync-request-vote-reply to %s:%d {term:%" PRIu64 ", grant:%d}, %s", host, port,
           pMsg->term, pMsg->voteGranted, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3539
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
M
Minghao Li 已提交
3540 3541 3542 3543 3544 3545 3546 3547 3548
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, %s", host,
           port, pMsg->term, pMsg->voteGranted, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3549
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
M
Minghao Li 已提交
3550 3551 3552 3553 3554 3555
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3556
           ", pterm:%" PRIu64 ", cmt:%" PRId64
M
Minghao Li 已提交
3557 3558 3559 3560 3561 3562 3563
           ", "
           "datalen:%d}, %s",
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
           pMsg->dataLen, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3564
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
M
Minghao Li 已提交
3565 3566 3567 3568 3569
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3570
           "recv sync-append-entries from %s:%d {term:%" PRIu64 ", pre-index:%" PRIu64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3571
           ", cmt:%" PRIu64 ", pterm:%" PRIu64
M
Minghao Li 已提交
3572 3573 3574 3575
           ", "
           "datalen:%d}, %s",
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
           pMsg->dataLen, s);
M
Minghao Li 已提交
3576
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3577 3578
}

wafwerar's avatar
wafwerar 已提交
3579
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
M
Minghao Li 已提交
3580 3581 3582 3583 3584 3585
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3586
           ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
M
Minghao Li 已提交
3587 3588
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
           pMsg->dataLen, pMsg->dataCount, s);
M
Minghao Li 已提交
3589 3590 3591
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3592
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
M
Minghao Li 已提交
3593 3594 3595 3596 3597 3598
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3599
           ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
M
Minghao Li 已提交
3600 3601
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
           pMsg->dataLen, pMsg->dataCount, s);
M
Minghao Li 已提交
3602
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3603 3604
}

3605
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
           "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3617
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
3618 3619 3620 3621 3622
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3623
           "recv sync-append-entries-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
M
Minghao Li 已提交
3624 3625
           "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
3626
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3627
}
3628 3629 3630 3631 3632 3633 3634

void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3635
           "send sync-heartbeat to %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64
M
Minghao Li 已提交
3636 3637
           "}, %s",
           host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
3638 3639 3640 3641 3642 3643 3644 3645 3646
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3647 3648 3649
           "recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64
           "}, %s",
           host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "send sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3671
}
3672 3673 3674 3675 3676 3677 3678

void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd,
           syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
}