syncMain.c 86.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
M
Minghao Li 已提交
26
#include "syncRaftCfg.h"
M
Minghao Li 已提交
27
#include "syncRaftLog.h"
M
Minghao Li 已提交
28
#include "syncRaftStore.h"
M
Minghao Li 已提交
29
#include "syncReplication.h"
M
Minghao Li 已提交
30 31
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
32
#include "syncRespMgr.h"
M
Minghao Li 已提交
33
#include "syncSnapshot.h"
M
Minghao Li 已提交
34
#include "syncTimeout.h"
M
Minghao Li 已提交
35
#include "syncUtil.h"
M
Minghao Li 已提交
36
#include "syncVoteMgr.h"
M
Minghao Li 已提交
37

M
Minghao Li 已提交
38 39 40 41 42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
43
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
44
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
45 46 47
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
48 49 50 51 52 53 54 55 56 57 58 59
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
static SyncIndex     syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
M
Minghao Li 已提交
60

61
int64_t syncOpen(SSyncInfo* pSyncInfo) {
S
Shengliang Guan 已提交
62 63
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
64
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
65 66
    return -1;
  }
M
Minghao Li 已提交
67

S
Shengliang Guan 已提交
68 69 70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
  if (pSyncNode->rid < 0) {
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
71 72 73
    return -1;
  }

S
Shengliang Guan 已提交
74 75 76 77 78 79
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
80
  return pSyncNode->rid;
M
Minghao Li 已提交
81
}
M
Minghao Li 已提交
82

M
Minghao Li 已提交
83
void syncStart(int64_t rid) {
S
Shengliang Guan 已提交
84 85 86 87
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodeStart(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
88 89 90
  }
}

M
Minghao Li 已提交
91
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
92
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
93
  if (pSyncNode != NULL) {
94
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
95
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
96
    syncNodeRemove(rid);
M
Minghao Li 已提交
97
  }
S
Shengliang Guan 已提交
98
}
M
Minghao Li 已提交
99

M
Minghao Li 已提交
100 101
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
102 103 104 105
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
M
Minghao Li 已提交
106 107
}

S
Shengliang Guan 已提交
108 109 110
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
111 112
}

S
Shengliang Guan 已提交
113
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
114
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
115
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
116

M
Minghao Li 已提交
117
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
118
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
119
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
120
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
121
    return -1;
M
Minghao Li 已提交
122
  }
123

S
Shengliang Guan 已提交
124 125
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
126

M
Minghao Li 已提交
127 128 129 130
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
131
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
132 133 134 135 136
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
    syncNodeReplicate(pSyncNode);
  }
S
Shengliang Guan 已提交
137

S
Shengliang Guan 已提交
138
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
139
  return 0;
M
Minghao Li 已提交
140
}
M
Minghao Li 已提交
141

S
Shengliang Guan 已提交
142 143 144 145
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
146
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
147 148
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
      code = syncNodeOnSnapshotReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
    default:
      sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg,
             TMSG_INFO(pMsg->msgType));
      code = -1;
M
Minghao Li 已提交
187 188
  }

S
Shengliang Guan 已提交
189
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
190
  return code;
191 192
}

S
Shengliang Guan 已提交
193
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
194
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
195
  if (pSyncNode == NULL) return -1;
196

S
Shengliang Guan 已提交
197
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
198
  syncNodeRelease(pSyncNode);
199 200 201
  return ret;
}

M
Minghao Li 已提交
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

218
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
219
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
220
  if (pSyncNode == NULL) {
221
    sError("sync begin snapshot error");
222 223
    return -1;
  }
224

225 226
  int32_t code = 0;

M
Minghao Li 已提交
227
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
228 229 230
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
231 232 233
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
234 235 236
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
237 238
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
239
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
240 241 242
      return 0;
    }

M
Minghao Li 已提交
243 244 245
    goto _DEL_WAL;

  } else {
246 247 248 249 250 251 252 253 254 255 256 257
    lastApplyIndex -= SYNC_VNODE_LOG_RETENTION;

    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
      syncNodeRelease(pSyncNode);
      return 0;
    }

M
Minghao Li 已提交
258 259 260 261 262 263 264 265 266 267 268 269 270 271
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
272 273 274 275
              sNTrace(pSyncNode,
                      "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                      " of %s:%d, do not delete wal",
                      lastApplyIndex, matchIndex, host, port);
M
Minghao Li 已提交
276 277
            } while (0);

S
Shengliang Guan 已提交
278
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
279 280 281 282 283 284
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
285 286 287
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
288
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
289 290 291 292
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
293
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
294
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
295 296 297
        return 0;

      } else {
S
Shengliang Guan 已提交
298
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
299
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
300 301 302 303 304 305 306 307 308
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
309 310 311
    }
  }

M
Minghao Li 已提交
312
_DEL_WAL:
313

M
Minghao Li 已提交
314
  do {
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

        code = walBeginSnapshot(pData->pWal, lastApplyIndex);
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
335

M
Minghao Li 已提交
336
      } else {
337 338
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
339
      }
340
    }
M
Minghao Li 已提交
341
  } while (0);
342

S
Shengliang Guan 已提交
343
  syncNodeRelease(pSyncNode);
344 345 346 347
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
348
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
349
  if (pSyncNode == NULL) {
350
    sError("sync end snapshot error");
351 352 353
    return -1;
  }

354 355 356 357
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
358
    if (code != 0) {
359
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
360
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
361 362
      return -1;
    } else {
S
Shengliang Guan 已提交
363
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
364 365
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
366
  }
367

S
Shengliang Guan 已提交
368
  syncNodeRelease(pSyncNode);
369 370 371
  return code;
}

M
Minghao Li 已提交
372
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
373
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
374
  if (pSyncNode == NULL) {
375
    sError("sync step down error");
M
Minghao Li 已提交
376 377 378 379
    return -1;
  }

  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
380
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
381 382 383
  return 0;
}

384 385 386
bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
387
    sError("sync ready for read error");
388 389 390 391 392 393 394 395 396 397
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) {
    syncNodeRelease(pSyncNode);
    return true;
  }

  bool ready = false;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
398 399 400
    if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
      // apply queue not empty
      ready = false;
401

402 403
    } else {
      if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
404
        SyncIndex       lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
405
        SSyncRaftEntry* pEntry = NULL;
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
        SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
        int32_t         code = 0;
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
          code = 0;

          sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);

        } else {
          sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);

          code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
        }

421 422 423 424 425
        if (code == 0 && pEntry != NULL) {
          if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
            ready = true;
          }

426 427 428 429 430
          if (h) {
            taosLRUCacheRelease(pCache, h, false);
          } else {
            syncEntryDestory(pEntry);
          }
431
        }
432 433 434 435
      }
    }
  }

436 437 438 439 440 441 442 443
  if (!ready) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
      terrno = TSDB_CODE_SYN_NOT_LEADER;
    } else {
      terrno = TSDB_CODE_APP_NOT_READY;
    }
  }

444 445 446 447
  syncNodeRelease(pSyncNode);
  return ready;
}

M
Minghao Li 已提交
448 449
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
450
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
451 452 453 454
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

455
  int32_t ret = 0;
456
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
457
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
458 459 460 461 462 463 464
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
465 466 467
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
468 469 470 471 472
  return ret;
}

int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
473
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
474 475 476 477
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

S
Shengliang Guan 已提交
478
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
479

480 481 482 483
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
484 485 486 487
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
488 489 490
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false);
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
491 492
}

493 494
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
495

S
Shengliang Guan 已提交
496
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
497 498 499 500
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
501 502
  }

503
  return state;
M
Minghao Li 已提交
504 505
}

506
#if 0
507 508 509 510 511
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
512
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
513 514 515 516 517 518 519 520 521 522 523
  if (pSyncNode == NULL) {
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
    }
S
Shengliang Guan 已提交
524
    syncNodeRelease(pSyncNode);
525 526 527 528 529 530 531 532 533 534
    return -1;
  }
  ASSERT(pEntry != NULL);

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

  syncEntryDestory(pEntry);
S
Shengliang Guan 已提交
535
  syncNodeRelease(pSyncNode);
536 537 538
  return 0;
}

539
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
540
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
541 542 543
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
544
  ASSERT(rid == pSyncNode->rid);
545 546
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
547
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
548

S
Shengliang Guan 已提交
549
  syncNodeRelease(pSyncNode);
550 551 552
  return 0;
}

553
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
554
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
555 556 557
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
558
  ASSERT(rid == pSyncNode->rid);
559 560 561 562

  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
563
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
564 565 566 567 568 569
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
570
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
571
         sMeta->lastConfigIndex);
572

S
Shengliang Guan 已提交
573
  syncNodeRelease(pSyncNode);
574 575
  return 0;
}
576
#endif
577

578 579 580 581
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
582
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
583 584 585 586 587
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
588
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
589
         snapshotLastApplyIndex, lastIndex);
590 591 592 593

  return lastIndex;
}

S
Shengliang Guan 已提交
594
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
M
Minghao Li 已提交
595
  pEpSet->numOfEps = 0;
596

S
Shengliang Guan 已提交
597
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
598
  if (pSyncNode == NULL) return;
599

S
Shengliang Guan 已提交
600
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
601 602 603 604 605
    SEp* pEp = &pEpSet->eps[i];
    tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    pEpSet->numOfEps++;
    sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
606
  }
M
Minghao Li 已提交
607 608 609
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
  }
M
Minghao Li 已提交
610

S
Shengliang Guan 已提交
611
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
612
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
613
}
614

M
Minghao Li 已提交
615
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
616
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
617
  if (pSyncNode == NULL) {
618
    sError("sync propose error");
M
Minghao Li 已提交
619
    return -1;
620
  }
M
Minghao Li 已提交
621

622
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
S
Shengliang Guan 已提交
623
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
624 625 626
  return ret;
}

627
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
628 629 630 631 632
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
    return -1;
  }
M
Minghao Li 已提交
633

S
Shengliang Guan 已提交
634 635 636 637 638 639 640 641
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

642
  // heartbeat timeout
643
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
644 645 646 647 648 649
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
650 651 652
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
653
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
654 655 656
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
657 658 659
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
S
Shengliang Guan 已提交
660 661
    } else {
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
662
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
663
             TMSG_INFO(pMsg->msgType));
664
      return -1;
665
    }
S
Shengliang Guan 已提交
666
  } else {
S
Shengliang Guan 已提交
667 668
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
669
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
670
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
671 672 673 674 675
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
    }
M
Minghao Li 已提交
676

677 678 679 680 681
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
682
    }
M
Minghao Li 已提交
683

684 685
    return code;
  }
M
Minghao Li 已提交
686 687
}

S
Shengliang Guan 已提交
688
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
689 690 691 692 693
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
694
  pSyncTimer->timeStamp = taosGetTimestampMs();
695 696 697 698
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
699
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
700
  int32_t ret = 0;
S
Shengliang Guan 已提交
701
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
702
  if (syncIsInit()) {
703 704 705 706 707 708
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
709
    pSyncTimer->timeStamp = tsNow;
710 711

    pData->syncNodeRid = pSyncNode->rid;
712 713 714
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
715
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
716

717 718
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
719 720 721 722 723 724
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
725
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
726 727 728 729
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
730 731
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
732 733 734
  return ret;
}

S
Shengliang Guan 已提交
735 736
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
737 738 739 740
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
741

M
Minghao Li 已提交
742 743 744 745
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
746
      goto _error;
M
Minghao Li 已提交
747
    }
748
  }
M
Minghao Li 已提交
749

S
Shengliang Guan 已提交
750
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
751
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
752
    // create a new raft config file
S
Shengliang Guan 已提交
753
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
754
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
755
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
756
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
757
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
758 759
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
760
      goto _error;
761
    }
762
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
763
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
764 765
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
766 767 768
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
769
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
770
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
771
      goto _error;
772
    }
S
Shengliang Guan 已提交
773 774

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
775 776 777 778 779 780
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
781 782 783 784
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
785 786

    raftCfgClose(pSyncNode->pRaftCfg);
787
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
788 789
  }

S
Shengliang Guan 已提交
790 791
  // init by SSyncInfo
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
792 793 794 795 796 797 798
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
799
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
800 801 802
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
803

M
Minghao Li 已提交
804
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
805
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
806 807 808
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
809

M
Minghao Li 已提交
810 811
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
812
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
813
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
814 815
    goto _error;
  }
M
Minghao Li 已提交
816

M
Minghao Li 已提交
817
  // init internal
M
Minghao Li 已提交
818
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
819
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
820
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
821
    goto _error;
822
  }
M
Minghao Li 已提交
823

M
Minghao Li 已提交
824
  // init peersNum, peers, peersId
M
Minghao Li 已提交
825
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
826 827
  int32_t j = 0;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
828 829
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
830 831 832
      j++;
    }
  }
S
Shengliang Guan 已提交
833
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
834
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
835
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
836
      goto _error;
837
    }
M
Minghao Li 已提交
838
  }
M
Minghao Li 已提交
839

M
Minghao Li 已提交
840
  // init replicaNum, replicasId
M
Minghao Li 已提交
841
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
842
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
843
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
844
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
845
      goto _error;
846
    }
M
Minghao Li 已提交
847 848
  }

M
Minghao Li 已提交
849
  // init raft algorithm
M
Minghao Li 已提交
850
  pSyncNode->pFsm = pSyncInfo->pFsm;
851
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
852
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
853 854
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
855
  // init life cycle outside
M
Minghao Li 已提交
856

M
Minghao Li 已提交
857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
881
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
882
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
883
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
884
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
885
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
886 887
    goto _error;
  }
M
Minghao Li 已提交
888

M
Minghao Li 已提交
889
  // init TLA+ candidate vars
M
Minghao Li 已提交
890
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
891
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
892
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
893 894
    goto _error;
  }
M
Minghao Li 已提交
895
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
896
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
897
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
898 899
    goto _error;
  }
M
Minghao Li 已提交
900

M
Minghao Li 已提交
901 902
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
903
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
904
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
905 906
    goto _error;
  }
M
Minghao Li 已提交
907
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
908
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
909
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
910 911
    goto _error;
  }
M
Minghao Li 已提交
912 913 914

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
915
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
916
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
917 918
    goto _error;
  }
919 920 921 922 923

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
924
    if (code != 0) {
S
Shengliang Guan 已提交
925
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
926
      goto _error;
927
    }
928 929
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
930
      sNTrace(pSyncNode, "reset commit index by snapshot");
931 932 933
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
934

M
Minghao Li 已提交
935 936 937 938 939
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
940
  // init ping timer
M
Minghao Li 已提交
941
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
942
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
943 944
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
945
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
946
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
947

M
Minghao Li 已提交
948 949
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
950
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
951
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
952
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
953 954 955 956
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
957
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
958 959
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
960
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
961 962
  pSyncNode->heartbeatTimerCounter = 0;

963 964 965 966 967
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
968
  // tools
M
Minghao Li 已提交
969
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
970
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
971
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
972 973
    goto _error;
  }
M
Minghao Li 已提交
974

975 976
  // restore state
  pSyncNode->restoreFinish = false;
977

M
Minghao Li 已提交
978
  // snapshot senders
S
Shengliang Guan 已提交
979
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
980 981 982
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
S
Shengliang Guan 已提交
983
    sSTrace(pSender, "snapshot sender create new while open, data:%p", pSender);
M
Minghao Li 已提交
984 985 986
  }

  // snapshot receivers
987
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
988

M
Minghao Li 已提交
989 990 991
  // is config changing
  pSyncNode->changing = false;

M
Minghao Li 已提交
992 993 994
  // peer state
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
995 996 997
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
998
  // start in syncNodeStart
M
Minghao Li 已提交
999
  // start raft
M
Minghao Li 已提交
1000
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1001

M
Minghao Li 已提交
1002 1003
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1004
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1005 1006
  pSyncNode->lastReplicateTime = timeNow;

1007 1008 1009
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

1010
  pSyncNode->isStart = true;
1011 1012 1013 1014
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;

S
Shengliang Guan 已提交
1015
  sNTrace(pSyncNode, "sync open, node:%p", pSyncNode);
1016

M
Minghao Li 已提交
1017
  return pSyncNode;
1018 1019 1020

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1021 1022
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1023 1024 1025 1026
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1027 1028
}

M
Minghao Li 已提交
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
    ASSERT(code == 0);
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

M
Minghao Li 已提交
1040 1041
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
1042
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
1043
    raftStoreNextTerm(pSyncNode->pRaftStore);
1044
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
1045

1046
    // Raft 3.6.2 Committing entries from previous terms
1047 1048
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
1049

M
Minghao Li 已提交
1050 1051
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
1052 1053
  }

1054 1055 1056
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
1057 1058
}

M
Minghao Li 已提交
1059 1060 1061 1062 1063 1064 1065 1066 1067
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
1068

1069 1070 1071
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
1072 1073
}

M
Minghao Li 已提交
1074 1075 1076 1077 1078 1079 1080 1081
void syncNodePreClose(SSyncNode* pSyncNode) {
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

1082 1083
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }

M
Minghao Li 已提交
1084
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1085 1086
  if (pSyncNode == NULL) return;
  sNTrace(pSyncNode, "sync close, data:%p", pSyncNode);
M
Minghao Li 已提交
1087

S
Shengliang Guan 已提交
1088
  int32_t ret = raftStoreClose(pSyncNode->pRaftStore);
M
Minghao Li 已提交
1089
  ASSERT(ret == 0);
M
Minghao Li 已提交
1090
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
1091

M
Minghao Li 已提交
1092
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1093
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1094
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1095
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1096
  votesRespondDestory(pSyncNode->pVotesRespond);
1097
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1098
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1099
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1100
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1101
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1102
  logStoreDestory(pSyncNode->pLogStore);
1103
  pSyncNode->pLogStore = NULL;
M
Minghao Li 已提交
1104
  raftCfgClose(pSyncNode->pRaftCfg);
1105
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1106 1107 1108 1109 1110

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1111 1112 1113 1114
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

S
Shengliang Guan 已提交
1115
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1116
    if ((pSyncNode->senders)[i] != NULL) {
S
Shengliang Guan 已提交
1117
      sSTrace((pSyncNode->senders)[i], "snapshot sender destroy while close, data:%p", (pSyncNode->senders)[i]);
M
Minghao Li 已提交
1118 1119 1120 1121 1122
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1123 1124 1125 1126 1127
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1128
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1129 1130
}

M
Minghao Li 已提交
1131
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1132

M
Minghao Li 已提交
1133 1134 1135
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1136 1137
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1138 1139 1140
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1141
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1142
  }
M
Minghao Li 已提交
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1156
  if (syncIsInit()) {
1157
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1158

1159 1160 1161 1162 1163
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1164

M
Minghao Li 已提交
1165
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1166
                 &pSyncNode->pElectTimer);
S
Shengliang Guan 已提交
1167

1168
  } else {
M
Minghao Li 已提交
1169
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1170
  }
M
Minghao Li 已提交
1171 1172 1173 1174 1175
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1176
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1177 1178
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1179

M
Minghao Li 已提交
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1190 1191
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1192 1193 1194 1195 1196 1197 1198
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1199
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1200

S
Shengliang Guan 已提交
1201 1202
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1203 1204 1205
  return ret;
}

M
Minghao Li 已提交
1206
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1207
  int32_t ret = 0;
S
Shengliang Guan 已提交
1208 1209
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1210 1211 1212
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1213
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1214
  }
1215

S
Shengliang Guan 已提交
1216
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1217 1218 1219
  return ret;
}

M
Minghao Li 已提交
1220
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1221
  int32_t ret = 0;
M
Minghao Li 已提交
1222

1223
#if 0
M
Minghao Li 已提交
1224
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1225 1226
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1227

S
Shengliang Guan 已提交
1228
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1229
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1230 1231 1232
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1233
  }
1234

M
Minghao Li 已提交
1235 1236 1237
  return ret;
}

M
Minghao Li 已提交
1238 1239
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1240 1241

#if 0
M
Minghao Li 已提交
1242 1243 1244
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1245
#endif
1246

S
Shengliang Guan 已提交
1247
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1248
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1249 1250 1251
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1252
  }
1253

M
Minghao Li 已提交
1254 1255 1256
  return ret;
}

1257 1258 1259 1260 1261 1262
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1263 1264 1265
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1266
  syncUtilRaftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1267
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1268 1269 1270
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1271
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1272
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1273
  } else {
M
Minghao Li 已提交
1274
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
S
Shengliang Guan 已提交
1275
    rpcFreeCont(pMsg->pCont);
M
Minghao Li 已提交
1276
    return -1;
M
Minghao Li 已提交
1277
  }
M
Minghao Li 已提交
1278

M
Minghao Li 已提交
1279 1280 1281 1282 1283
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1284
  syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1285
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1286 1287 1288
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1289
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1290
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1291
  } else {
M
Minghao Li 已提交
1292
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1293
  }
M
Minghao Li 已提交
1294 1295 1296
  return 0;
}

1297
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1298 1299 1300
  bool b1 = false;
  bool b2 = false;

S
Shengliang Guan 已提交
1301
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1302 1303 1304 1305 1306 1307 1308
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

S
Shengliang Guan 已提交
1309
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

  ASSERT(b1 == b2);
  return b1;
}

1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1337
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1338
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1339 1340 1341 1342
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1343

1344
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1345 1346
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

1347 1348
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1349 1350
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1351

M
Minghao Li 已提交
1352 1353
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1354

M
Minghao Li 已提交
1355 1356 1357 1358
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1359
  }
1360

M
Minghao Li 已提交
1361 1362 1363 1364 1365
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1366

M
Minghao Li 已提交
1367
  // log begin config change
S
Shengliang Guan 已提交
1368 1369 1370 1371
  char oldCfgStr[1024] = {0};
  char newCfgStr[1024] = {0};
  syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
  syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
1372
  sNInfo(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
M
Minghao Li 已提交
1373

M
Minghao Li 已提交
1374 1375
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1376
  }
M
Minghao Li 已提交
1377 1378
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1379 1380
  }

M
Minghao Li 已提交
1381
  // add last config index
M
Minghao Li 已提交
1382
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1383

M
Minghao Li 已提交
1384 1385 1386 1387 1388 1389 1390 1391 1392
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1393
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1394
      oldSenders[i] = (pSyncNode->senders)[i];
S
Shengliang Guan 已提交
1395
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1396
    }
1397

M
Minghao Li 已提交
1398 1399
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1400
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1401 1402 1403

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1404 1405
    int32_t j = 0;
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
1406 1407 1408 1409 1410
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
S
Shengliang Guan 已提交
1411
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1412
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1413
    }
1414

M
Minghao Li 已提交
1415 1416
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
1417
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
1418
      syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1419
    }
1420

1421 1422 1423
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1424 1425 1426 1427
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1428

M
Minghao Li 已提交
1429
    // reset snapshot senders
1430

M
Minghao Li 已提交
1431
    // clear new
S
Shengliang Guan 已提交
1432
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1433 1434
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
1435

M
Minghao Li 已提交
1436
    // reset new
S
Shengliang Guan 已提交
1437
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1438 1439
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1440
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1441
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
M
Minghao Li 已提交
1442 1443 1444
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1445
          sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
S
Shengliang Guan 已提交
1446
                  (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
M
Minghao Li 已提交
1447 1448 1449 1450 1451 1452 1453 1454 1455

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

S
Shengliang Guan 已提交
1456 1457
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
                  i, host, port, (pSyncNode->senders)[i], reset);
M
Minghao Li 已提交
1458 1459

          break;
M
Minghao Li 已提交
1460
        }
1461 1462
      }
    }
1463

M
Minghao Li 已提交
1464
    // create new
S
Shengliang Guan 已提交
1465
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1466 1467
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
S
Shengliang Guan 已提交
1468 1469 1470
        sSTrace((pSyncNode->senders)[i], "snapshot sender create new while reconfig, data:%p", (pSyncNode->senders)[i]);
      } else {
        sSTrace((pSyncNode->senders)[i], "snapshot sender already exist, data:%p", (pSyncNode->senders)[i]);
M
Minghao Li 已提交
1471
      }
1472 1473
    }

M
Minghao Li 已提交
1474
    // free old
S
Shengliang Guan 已提交
1475
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1476
      if (oldSenders[i] != NULL) {
S
Shengliang Guan 已提交
1477
        sNTrace(pSyncNode, "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1478 1479 1480
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1481 1482
    }

1483
    // persist cfg
M
Minghao Li 已提交
1484
    raftCfgPersist(pSyncNode->pRaftCfg);
1485

S
Shengliang Guan 已提交
1486
    char tmpbuf[1024] = {0};
1487
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
S
Shengliang Guan 已提交
1488
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1489

M
Minghao Li 已提交
1490 1491 1492
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
1493 1494 1495 1496 1497

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
      syncMaybeAdvanceCommitIndex(pSyncNode);

M
Minghao Li 已提交
1498 1499 1500 1501
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
1502
    // persist cfg
M
Minghao Li 已提交
1503
    raftCfgPersist(pSyncNode->pRaftCfg);
1504 1505
    sNInfo(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s", oldConfig.replicaNum,
           pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
1506
  }
1507

M
Minghao Li 已提交
1508
_END:
M
Minghao Li 已提交
1509
  // log end config change
1510
  sNInfo(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1511 1512
}

M
Minghao Li 已提交
1513 1514 1515 1516
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1517
    char tmpBuf[64];
1518
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1519
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1520 1521 1522 1523
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1524 1525 1526 1527 1528 1529
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1530
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1531
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1532
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1533
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1534 1535
    return;
  }
M
Minghao Li 已提交
1536 1537

  do {
1538
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1539
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1540 1541 1542 1543 1544
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1545
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1546 1547 1548 1549 1550 1551 1552 1553 1554 1555
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1556 1557
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1558
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1559
  // maybe clear leader cache
M
Minghao Li 已提交
1560 1561 1562 1563
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
1564
  // state change
M
Minghao Li 已提交
1565 1566 1567
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1568 1569
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1570

1571 1572 1573
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1574 1575 1576 1577 1578
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1579 1580 1581
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1582
  // trace log
S
Shengliang Guan 已提交
1583
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1604
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1605 1606
  pSyncNode->leaderTime = taosGetTimestampMs();

1607 1608
  pSyncNode->becomeLeaderNum++;

1609 1610 1611
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1612
  // state change
M
Minghao Li 已提交
1613
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1614 1615

  // set leader cache
M
Minghao Li 已提交
1616 1617
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1618
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1619 1620
    // maybe overwrite myself, no harm
    // just do it!
1621 1622 1623 1624 1625 1626 1627 1628 1629

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1630 1631
  }

S
Shengliang Guan 已提交
1632
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1633 1634
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1635 1636 1637
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1638 1639 1640
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1641
#if 0
1642 1643
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1644
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1645
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1646 1647 1648
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1649
    }
1650
    (pMySender->privateTerm) += 100;
1651
  }
M
Minghao Li 已提交
1652
#endif
1653

1654 1655 1656 1657 1658
  // close receiver
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1659
  // stop elect timer
M
Minghao Li 已提交
1660
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1661

M
Minghao Li 已提交
1662 1663
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1664

M
Minghao Li 已提交
1665 1666
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1667

1668 1669 1670 1671 1672
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1673 1674 1675
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1676
  // trace log
1677
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1678 1679 1680
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1681 1682
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1683
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1684

S
Shengliang Guan 已提交
1685
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1686

M
Minghao Li 已提交
1687
  // Raft 3.6.2 Committing entries from previous terms
1688 1689
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1690 1691

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1692
    syncNodeReplicate(pSyncNode);
1693
  }
M
Minghao Li 已提交
1694 1695
}

M
Minghao Li 已提交
1696 1697
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1698
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1699
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1700 1701 1702 1703 1704
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1705 1706 1707
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1708
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1709
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
S
Shengliang Guan 已提交
1710
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1711 1712 1713
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1714
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1715
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
S
Shengliang Guan 已提交
1716
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1717 1718 1719
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1720
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1721
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
S
Shengliang Guan 已提交
1722
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1723 1724
}

M
Minghao Li 已提交
1725 1726
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1727
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
M
Minghao Li 已提交
1728 1729
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1730 1731 1732 1733

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1734
// simulate get vote from outside
M
Minghao Li 已提交
1735
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1736
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1737

S
Shengliang Guan 已提交
1738 1739
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1740
  if (ret != 0) return;
S
Shengliang Guan 已提交
1741 1742

  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1743 1744 1745 1746 1747 1748 1749
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1750
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1751 1752
}

M
Minghao Li 已提交
1753
// return if has a snapshot
M
Minghao Li 已提交
1754 1755
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1756
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1757 1758
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1759 1760 1761 1762 1763 1764 1765
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1766 1767
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1768
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1769
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1770 1771
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1772 1773 1774 1775 1776 1777 1778
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1779 1780
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1781 1782
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1783 1784
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1785
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1786 1787
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1788 1789
    }

M
Minghao Li 已提交
1790 1791 1792
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1793 1794 1795 1796
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1797
  } else {
M
Minghao Li 已提交
1798 1799
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1800
  }
M
Minghao Li 已提交
1801

M
Minghao Li 已提交
1802 1803 1804 1805 1806 1807 1808
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1809 1810
  return 0;
}
M
Minghao Li 已提交
1811

M
Minghao Li 已提交
1812
// return append-entries first try index
M
Minghao Li 已提交
1813 1814 1815 1816 1817
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1818 1819
// if index > 0, return index - 1
// else, return -1
1820 1821 1822 1823 1824 1825 1826 1827 1828
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1829 1830 1831 1832
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1833 1834 1835 1836 1837 1838 1839 1840 1841
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

1842 1843 1844
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

1845
  SSyncRaftEntry* pPreEntry = NULL;
1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
1860 1861 1862 1863 1864 1865

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

1866 1867 1868
  if (code == 0) {
    ASSERT(pPreEntry != NULL);
    preTerm = pPreEntry->term;
1869 1870 1871 1872 1873 1874 1875

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
      syncEntryDestory(pPreEntry);
    }

1876 1877
    return preTerm;
  } else {
1878 1879 1880 1881
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
1882 1883 1884 1885
      }
    }
  }

1886
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
1887
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1888 1889
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
1890 1891 1892 1893

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1894
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1895 1896 1897
  return 0;
}

M
Minghao Li 已提交
1898
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1899 1900 1901 1902 1903
  if (!syncIsInit()) return;

  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1904
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
1905 1906
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
1907
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
1908 1909
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
1910
    }
M
Minghao Li 已提交
1911

M
Minghao Li 已提交
1912
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
1913 1914
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
1915
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
1916 1917
      rpcFreeCont(rpcMsg.pCont);
      return;
1918 1919
    }

S
Shengliang Guan 已提交
1920
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
M
Minghao Li 已提交
1921 1922 1923 1924
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1925
  if (!syncIsInit()) return;
S
Shengliang Guan 已提交
1926

M
Minghao Li 已提交
1927 1928
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
1929

1930
  if (pNode == NULL) return;
M
Minghao Li 已提交
1931 1932 1933 1934 1935

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
1936

1937
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
1938 1939 1940 1941
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
1942

S
Shengliang Guan 已提交
1943
  SRpcMsg rpcMsg = {0};
1944 1945
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
1946

S
Shengliang Guan 已提交
1947
  if (code != 0) {
M
Minghao Li 已提交
1948
    sError("failed to build elect msg");
M
Minghao Li 已提交
1949
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
1950 1951 1952 1953
    return;
  }

  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
1954
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
1955 1956 1957

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
1958
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
1959
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1960
    syncNodeRelease(pNode);
1961
    return;
S
Shengliang Guan 已提交
1962
  }
M
Minghao Li 已提交
1963 1964

  syncNodeRelease(pNode);
M
Minghao Li 已提交
1965 1966
}

M
Minghao Li 已提交
1967
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1968
  if (!syncIsInit()) return;
1969

S
Shengliang Guan 已提交
1970 1971 1972 1973
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1974
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
1975 1976 1977
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
1978
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
1979
        return;
1980
      }
M
Minghao Li 已提交
1981

1982
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
1983 1984
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
1985
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
1986 1987
        rpcFreeCont(rpcMsg.pCont);
        return;
1988
      }
S
Shengliang Guan 已提交
1989 1990 1991 1992

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

1993
    } else {
S
Shengliang Guan 已提交
1994 1995
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
1996
    }
M
Minghao Li 已提交
1997 1998 1999
  }
}

2000
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2001
  int64_t hbDataRid = (int64_t)param;
2002
  int64_t tsNow = taosGetTimestampMs();
2003 2004 2005

  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
M
Minghao Li 已提交
2006
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2007 2008
    return;
  }
2009

2010
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2011
  if (pSyncNode == NULL) {
2012
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2013
    sError("hb timer get pSyncNode NULL");
2014 2015 2016 2017 2018 2019 2020 2021
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2022
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2023 2024 2025
    return;
  }

M
Minghao Li 已提交
2026
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2027 2028
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2029
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2030 2031 2032
    return;
  }

M
Minghao Li 已提交
2033
  if (pSyncNode->pRaftStore == NULL) {
2034 2035
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2036
    sError("vgId:%d, hb timer raft store already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2037 2038 2039
    return;
  }

M
Minghao Li 已提交
2040
  // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
2041 2042

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2043 2044 2045
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2046
    if (timerLogicClock == msgLogicClock) {
2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066
      if (tsNow > pData->execTime) {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
            "---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif

        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
        pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
        pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
        pSyncMsg->privateTerm = 0;
2067
        pSyncMsg->timeStamp = tsNow;
2068 2069 2070 2071 2072 2073

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
2074 2075
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2076 2077 2078 2079 2080 2081 2082 2083
      } else {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
      }

M
Minghao Li 已提交
2084 2085
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2086 2087
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2088 2089 2090 2091
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2092
    } else {
M
Minghao Li 已提交
2093 2094
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2095 2096
    }
  }
2097 2098 2099

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2100 2101
}

2102 2103 2104 2105 2106
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
2107

2108 2109 2110 2111
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
2112

S
Shengliang Guan 已提交
2113
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2114
  int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
M
Minghao Li 已提交
2115
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
2116

2117 2118 2119
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2120
    sError("failed to propose noop msg while enqueue since %s", terrstr());
2121 2122 2123
  }

  return code;
M
Minghao Li 已提交
2124 2125
}

2126 2127
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2128 2129 2130 2131
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2132 2133
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2134 2135 2136 2137 2138 2139 2140 2141 2142
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

2143
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165
  if (pSyncNode->replicaNum == 1) {
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

    if (tsNow - recvTime > SYNC_HEART_TIMEOUT_MS) {
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

M
Minghao Li 已提交
2166 2167 2168
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

2169
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2170
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2171
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2172
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2173

2174 2175
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2176
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2177
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2178
    if (code != 0) {
M
Minghao Li 已提交
2179
      sError("append noop error");
2180 2181
      return -1;
    }
2182 2183

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2184 2185
  }

2186 2187 2188 2189 2190 2191
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2192 2193 2194
  return ret;
}

S
Shengliang Guan 已提交
2195 2196
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2197 2198

  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2199
  int64_t timeDiff = tsMs - pMsg->timeStamp;
2200
  syncLogRecvHeartbeat(ths, pMsg, timeDiff);
2201

2202 2203 2204 2205
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2206 2207 2208 2209
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number
2210
  pMsgReply->timeStamp = tsMs;
2211

M
Minghao Li 已提交
2212
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2213 2214
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);

2215
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2216
    ths->minMatchIndex = pMsg->minMatchIndex;
2217 2218

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2219
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
S
Shengliang Guan 已提交
2220 2221 2222 2223
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2224 2225
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;
2226
      SyncIndex fcIndex = pSyncMsg->fcIndex;
2227 2228 2229 2230 2231 2232 2233

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2234
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2235 2236
        }
      }
2237 2238 2239
    }
  }

M
Minghao Li 已提交
2240
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2241
    // syncNodeStepDown(ths, pMsg->term);
S
Shengliang Guan 已提交
2242 2243 2244 2245
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2246 2247 2248
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

S
Shengliang Guan 已提交
2249 2250
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2251 2252 2253 2254
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2255
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2256 2257
      }
    }
M
Minghao Li 已提交
2258 2259
  }

2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271
  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
  return 0;
}

2272 2273
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2274

M
Minghao Li 已提交
2275
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2276
  int64_t timeDiff = tsMs - pMsg->timeStamp;
S
Shengliang Guan 已提交
2277
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff);
M
Minghao Li 已提交
2278

2279
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2280
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2281 2282 2283
  return 0;
}

S
Shengliang Guan 已提交
2284 2285
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2286 2287
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2288 2289 2290
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2291 2292 2293
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2294
  } else {
M
Minghao Li 已提交
2295
    sError("error local cmd");
M
Minghao Li 已提交
2296 2297 2298 2299 2300
  }

  return 0;
}

M
Minghao Li 已提交
2301 2302 2303 2304 2305 2306 2307 2308 2309 2310
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2311

2312
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2313
  sNTrace(ths, "on client request");
2314

M
Minghao Li 已提交
2315
  int32_t ret = 0;
2316
  int32_t code = 0;
M
Minghao Li 已提交
2317

M
Minghao Li 已提交
2318
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2319
  SyncTerm        term = ths->pRaftStore->currentTerm;
2320 2321 2322 2323 2324 2325 2326
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2327

2328 2329
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2330
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2331 2332 2333
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2334 2335 2336 2337 2338 2339
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2340

2341 2342 2343 2344
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2356
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2357 2358 2359 2360 2361 2362 2363

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }

2364 2365
        return -1;
      }
2366
    }
M
Minghao Li 已提交
2367

2368 2369
    syncCacheEntry(ths->pLogStore, pEntry, &h);

2370 2371
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2372
      syncNodeReplicate(ths);
2373
    }
2374

2375 2376
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2377 2378 2379 2380 2381
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2382
    }
M
Minghao Li 已提交
2383 2384
  }

2385 2386 2387 2388 2389 2390 2391 2392
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2393 2394 2395 2396 2397 2398
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2399
  return ret;
2400
}
M
Minghao Li 已提交
2401

S
Shengliang Guan 已提交
2402 2403 2404
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2405
      return "follower";
S
Shengliang Guan 已提交
2406
    case TAOS_SYNC_STATE_CANDIDATE:
2407
      return "candidate";
S
Shengliang Guan 已提交
2408
    case TAOS_SYNC_STATE_LEADER:
2409
      return "leader";
S
Shengliang Guan 已提交
2410
    default:
2411
      return "error";
S
Shengliang Guan 已提交
2412
  }
M
Minghao Li 已提交
2413
}
2414

2415
#if 0
2416
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2417
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2418
    sNTrace(ths, "I am not follower, can not do leader transfer");
2419 2420
    return 0;
  }
2421 2422

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2423
    sNTrace(ths, "restore not finish, can not do leader transfer");
2424 2425 2426
    return 0;
  }

2427
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2428
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2429 2430 2431 2432
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2433
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2434 2435 2436
    return 0;
  }

2437 2438
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2439
      sNTrace(ths, "I am vnode, can not do leader transfer");
2440 2441 2442 2443
      return 0;
    }
  */

2444
  SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
S
Shengliang Guan 已提交
2445
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2446

M
Minghao Li 已提交
2447 2448 2449
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2450

M
Minghao Li 已提交
2451 2452
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2453 2454 2455 2456
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
M
Minghao Li 已提交
2457

2458
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2459
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2460 2461
  }

M
Minghao Li 已提交
2462
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2463
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2464 2465 2466 2467 2468 2469 2470 2471 2472
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2473 2474
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2475 2476
  }

2477 2478 2479
  return 0;
}

2480 2481
#endif

2482
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2483
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2497 2498 2499 2500
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2501
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2502 2503 2504 2505
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2506 2507 2508 2509 2510 2511 2512 2513 2514
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2515
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2516

M
Minghao Li 已提交
2517 2518 2519
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2520 2521
  }

2522 2523
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2524

S
Shengliang Guan 已提交
2525
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2526 2527 2528 2529 2530 2531

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2532 2533 2534 2535
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2536 2537 2538

          sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry);

2539
        } else {
2540 2541
          sNTrace(ths, "miss cache index:%" PRId64, i);

2542
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
M
Minghao Li 已提交
2543 2544 2545
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2546
            sNError(ths, "get log entry error");
2547
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2548 2549
            continue;
          }
2550
        }
2551

2552
        SRpcMsg rpcMsg = {0};
2553 2554
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2555
        sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType));
M
Minghao Li 已提交
2556

2557
        // user commit
2558 2559
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2560
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2561 2562 2563
            internalExecute = false;
          }

M
Minghao Li 已提交
2564 2565
          sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute,
                  TMSG_INFO(pEntry->msgType));
2566

2567 2568
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2581
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2582
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
2583
          }
2584 2585
        }

2586 2587
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2588
        // leader transfer
2589 2590 2591
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
2592
        }
2593
#endif
2594 2595

        // restore finish
2596
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
2597 2598 2599 2600 2601 2602
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
2603

2604
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
2605
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
2606 2607 2608 2609
          }
        }

        rpcFreeCont(rpcMsg.pCont);
2610 2611 2612 2613 2614
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2615 2616 2617 2618
      }
    }
  }
  return 0;
2619 2620 2621
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2622
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2623 2624 2625 2626 2627
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2628 2629 2630 2631
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2632
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2633 2634 2635 2636 2637
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2638
}
M
Minghao Li 已提交
2639

2640 2641
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2642
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2643 2644 2645 2646 2647 2648 2649
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2650 2651
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2652
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2653 2654 2655 2656 2657 2658 2659 2660 2661
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2662
  if (pState == NULL) {
2663
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2664 2665
    return false;
  }
M
Minghao Li 已提交
2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2691
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2692
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2693
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2694 2695 2696 2697 2698 2699
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2700
}