syncMain.c 98.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
M
Minghao Li 已提交
40

M
Minghao Li 已提交
41 42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
48 49 50
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58 59 60 61
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
64
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
65
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
66
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
67 68
    return -1;
  }
M
Minghao Li 已提交
69

S
Shengliang Guan 已提交
70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
71
  if (pSyncNode->rid < 0) {
72
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
73 74 75
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79 80 81
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
82
  return pSyncNode->rid;
M
Minghao Li 已提交
83
}
M
Minghao Li 已提交
84

B
Benguang Zhao 已提交
85
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
86
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
87
  if (pSyncNode == NULL) {
B
Benguang Zhao 已提交
88 89 90 91 92
    sError("failed to acquire rid: %" PRId64 " of tsNodeReftId for pSyncNode", rid);
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
93
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
94
    goto _err;
M
Minghao Li 已提交
95
  }
M
Minghao Li 已提交
96

B
Benguang Zhao 已提交
97 98 99 100
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
101

B
Benguang Zhao 已提交
102 103
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
104

105 106 107
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
108 109
}

M
Minghao Li 已提交
110
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
111
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
112
  if (pSyncNode != NULL) {
113
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
114
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
115
    syncNodeRemove(rid);
M
Minghao Li 已提交
116 117 118
  }
}

M
Minghao Li 已提交
119 120
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
121 122 123
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
124 125 126
  }
}

S
Shengliang Guan 已提交
127 128 129
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
130 131
}

S
Shengliang Guan 已提交
132
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
133
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
134
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
135

M
Minghao Li 已提交
136
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
137
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
138
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
139
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
140
    return -1;
M
Minghao Li 已提交
141
  }
142

S
Shengliang Guan 已提交
143 144
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
145

M
Minghao Li 已提交
146 147 148 149
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
150
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
151 152 153
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
154
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
155
  }
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
158
  return 0;
M
Minghao Li 已提交
159
}
M
Minghao Li 已提交
160

S
Shengliang Guan 已提交
161 162 163 164
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
165
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
166 167
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
197
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
198 199 200 201 202
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
    default:
203
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
204
      code = -1;
M
Minghao Li 已提交
205 206
  }

S
Shengliang Guan 已提交
207
  syncNodeRelease(pSyncNode);
208 209 210 211
  if (code != 0) {
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
           terrno);
  }
S
Shengliang Guan 已提交
212
  return code;
213 214
}

S
Shengliang Guan 已提交
215
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
216
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
217
  if (pSyncNode == NULL) return -1;
218

S
Shengliang Guan 已提交
219
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
220
  syncNodeRelease(pSyncNode);
221 222 223
  return ret;
}

224
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
225
  SSyncNode* pNode = syncNodeAcquire(rid);
226
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
227 228

  SRpcMsg rpcMsg = {0};
229
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
230 231 232
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
233
  if (ret == 1) {
234
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
235
    rpcSendResponse(&rpcMsg);
236 237
    return 0;
  } else {
238
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
239
    return -1;
240
  }
S
Shengliang Guan 已提交
241 242
}

M
Minghao Li 已提交
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

259
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
260
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
261
  if (pSyncNode == NULL) {
262
    sError("sync begin snapshot error");
263 264
    return -1;
  }
265

266 267
  int32_t code = 0;

M
Minghao Li 已提交
268
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
269 270 271
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
272 273 274
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
275 276 277
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
278 279
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
280
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
281 282 283
      return 0;
    }

M
Minghao Li 已提交
284 285 286
    goto _DEL_WAL;

  } else {
287 288 289 290 291 292 293 294 295 296 297 298
    lastApplyIndex -= SYNC_VNODE_LOG_RETENTION;

    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
      syncNodeRelease(pSyncNode);
      return 0;
    }

M
Minghao Li 已提交
299 300 301 302 303 304 305 306 307 308
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
309 310 311 312
            sNTrace(pSyncNode,
                    "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                    " of dnode:%d, do not delete wal",
                    lastApplyIndex, matchIndex, DID(&pSyncNode->peersId[i]));
M
Minghao Li 已提交
313

S
Shengliang Guan 已提交
314
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
315 316 317 318 319 320
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
321 322 323
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
324
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
325 326 327 328
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
329
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
330
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
331 332 333
        return 0;

      } else {
S
Shengliang Guan 已提交
334
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
335
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
336 337 338 339 340 341 342 343 344
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
345 346 347
    }
  }

M
Minghao Li 已提交
348
_DEL_WAL:
349

M
Minghao Li 已提交
350
  do {
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

        code = walBeginSnapshot(pData->pWal, lastApplyIndex);
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
371

M
Minghao Li 已提交
372
      } else {
373 374
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
375
      }
376
    }
M
Minghao Li 已提交
377
  } while (0);
378

S
Shengliang Guan 已提交
379
  syncNodeRelease(pSyncNode);
380 381 382 383
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
384
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
385
  if (pSyncNode == NULL) {
386
    sError("sync end snapshot error");
387 388 389
    return -1;
  }

390 391 392 393
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
394
    if (code != 0) {
395
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
396
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
397 398
      return -1;
    } else {
S
Shengliang Guan 已提交
399
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
400 401
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
402
  }
403

S
Shengliang Guan 已提交
404
  syncNodeRelease(pSyncNode);
405 406 407
  return code;
}

M
Minghao Li 已提交
408
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
409
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
410
  if (pSyncNode == NULL) {
411
    sError("sync step down error");
M
Minghao Li 已提交
412 413 414
    return -1;
  }

M
Minghao Li 已提交
415
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
416
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
417
  return 0;
M
Minghao Li 已提交
418 419
}

420
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
421
  if (pSyncNode == NULL) {
422
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
423
    sError("sync ready for read error");
424 425
    return false;
  }
M
Minghao Li 已提交
426

427 428 429 430 431 432
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

  if (pSyncNode->restoreFinish) {
433
    return true;
M
Minghao Li 已提交
434 435
  }

436
  bool ready = false;
437 438 439
  if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
    // apply queue not empty
    ready = false;
M
Minghao Li 已提交
440

441 442 443 444 445 446 447 448 449 450 451 452 453
  } else {
    if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
      SyncIndex       lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
      SSyncRaftEntry* pEntry = NULL;
      SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
      LRUHandle*      h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
      int32_t         code = 0;
      if (h) {
        pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        code = 0;

        pSyncNode->pLogStore->cacheHit++;
        sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
M
Minghao Li 已提交
454

455 456 457
      } else {
        pSyncNode->pLogStore->cacheMiss++;
        sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
M
Minghao Li 已提交
458

459 460
        code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
      }
461

462 463 464
      if (code == 0 && pEntry != NULL) {
        if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
          ready = true;
465
        }
466

467 468 469 470
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestroy(pEntry);
471
        }
472 473 474 475
      }
    }
  }

476
  if (!ready) {
477
    terrno = TSDB_CODE_SYN_RESTORING;
478
  }
479

480 481 482 483 484 485 486 487 488 489 490 491
  return ready;
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

492 493
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
494
}
M
Minghao Li 已提交
495

496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
518 519
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
520
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
521 522
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
523
  }
M
Minghao Li 已提交
524

525
  int32_t ret = 0;
526
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
527
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
528 529 530 531 532 533 534
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
535 536 537
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
538
  return ret;
M
Minghao Li 已提交
539 540
}

M
Minghao Li 已提交
541 542
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
543
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
544 545
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
546
  }
547

S
Shengliang Guan 已提交
548
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
549

550 551 552 553
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
554
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
M
Minghao Li 已提交
555 556 557
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
558
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
559 560
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
561 562
}

563 564
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
565

S
Shengliang Guan 已提交
566
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
567 568 569
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
570 571 572 573 574
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
575
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
576 577
  }

578
  return state;
M
Minghao Li 已提交
579 580
}

581
#if 0
582 583 584 585 586
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
587
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
588 589 590
  if (pSyncNode == NULL) {
    return -1;
  }
591
  ASSERT(rid == pSyncNode->rid);
592 593 594 595 596

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
B
Benguang Zhao 已提交
597
      syncEntryDestroy(pEntry);
598
    }
S
Shengliang Guan 已提交
599
    syncNodeRelease(pSyncNode);
600 601
    return -1;
  }
602
  ASSERT(pEntry != NULL);
603 604 605 606 607 608

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

609
  syncEntryDestroy(pEntry);
S
Shengliang Guan 已提交
610
  syncNodeRelease(pSyncNode);
611 612 613
  return 0;
}

614
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
615
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
616 617 618
  if (pSyncNode == NULL) {
    return -1;
  }
619
  ASSERT(rid == pSyncNode->rid);
620
  sMeta->lastConfigIndex = pSyncNode->raftCfg.lastConfigIndex;
621

622
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->raftCfg.lastConfigIndex);
623

S
Shengliang Guan 已提交
624
  syncNodeRelease(pSyncNode);
625 626 627
  return 0;
}

628
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
629
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
630 631 632
  if (pSyncNode == NULL) {
    return -1;
  }
633
  ASSERT(rid == pSyncNode->rid);
634

635 636
  ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
637

638 639 640 641
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
642 643 644
    }
  }
  sMeta->lastConfigIndex = lastIndex;
645
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
646
         sMeta->lastConfigIndex);
647

S
Shengliang Guan 已提交
648
  syncNodeRelease(pSyncNode);
649 650
  return 0;
}
651
#endif
652

653
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
654 655
  ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
656

657 658 659 660
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
661 662
    }
  }
S
Shengliang Guan 已提交
663
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
664
         snapshotLastApplyIndex, lastIndex);
665 666 667 668

  return lastIndex;
}

669 670
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
671

S
Shengliang Guan 已提交
672
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
673
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
674

675
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
676
    SEp* pEp = &pEpSet->eps[i];
677 678
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
S
Shengliang Guan 已提交
679
    pEpSet->numOfEps++;
680
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
681
  }
M
Minghao Li 已提交
682
  if (pEpSet->numOfEps > 0) {
683
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
684 685
  }

S
Shengliang Guan 已提交
686
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
687
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
688 689
}

S
Shengliang Guan 已提交
690
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
691
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
692
  if (pSyncNode == NULL) {
693
    sError("sync propose error");
M
Minghao Li 已提交
694
    return -1;
695
  }
696

S
Shengliang Guan 已提交
697
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
698
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
699 700
  return ret;
}
M
Minghao Li 已提交
701

S
Shengliang Guan 已提交
702
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
703 704
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
705
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
706 707
    return -1;
  }
708

S
Shengliang Guan 已提交
709 710 711 712 713 714 715
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
716

717
  // heartbeat timeout
718
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
719 720 721 722 723 724
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
725 726 727
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
728
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
729 730 731
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
732 733 734
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
735
    } else {
S
Shengliang Guan 已提交
736
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
737
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
738
             TMSG_INFO(pMsg->msgType));
739
      return -1;
740
    }
S
Shengliang Guan 已提交
741
  } else {
S
Shengliang Guan 已提交
742 743
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
744
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
745
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
746 747 748 749
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
750
    }
751

752 753 754 755 756
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
757
    }
M
Minghao Li 已提交
758

S
Shengliang Guan 已提交
759
    if (seq != NULL) *seq = seqNum;
760
    return code;
M
Minghao Li 已提交
761
  }
M
Minghao Li 已提交
762 763
}

S
Shengliang Guan 已提交
764
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
765 766 767 768 769
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
770
  pSyncTimer->timeStamp = taosGetTimestampMs();
771 772 773 774
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
775
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
776
  int32_t ret = 0;
S
Shengliang Guan 已提交
777
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
778
  if (syncIsInit()) {
779 780 781 782 783 784
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
785
    pSyncTimer->timeStamp = tsNow;
786 787

    pData->syncNodeRid = pSyncNode->rid;
788 789 790
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
791
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
792

793 794
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
795 796 797 798 799 800
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
801
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
802 803 804 805
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
806 807
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
808 809 810
  return ret;
}

811
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
812 813 814
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
815 816 817
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

818 819 820 821 822 823 824 825 826 827 828 829 830
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer: %" PRId64 ", snapshotVer: %" PRId64,
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
831
// open/close --------------
S
Shengliang Guan 已提交
832 833
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
834 835 836 837
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
838

M
Minghao Li 已提交
839 840 841 842
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
843
      goto _error;
M
Minghao Li 已提交
844
    }
845
  }
M
Minghao Li 已提交
846

847 848 849
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
S
Shengliang Guan 已提交
850
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
851

852
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
853
    // create a new raft config file
854 855 856 857 858 859 860 861 862 863
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
    pSyncNode->raftCfg.lastConfigIndex = SYNC_INDEX_INVALID;
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
    pSyncNode->raftCfg.configIndexCount = 1;
    pSyncNode->raftCfg.configIndexArr[0] = -1;

    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
864
      goto _error;
865
    }
866 867
  } else {
    // update syncCfg by raft_config.json
868 869
    if (syncReadCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
870
      goto _error;
871
    }
S
Shengliang Guan 已提交
872

873
    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
874
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
875 876 877
      pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
      if (syncWriteCfgFile(pSyncNode) != 0) {
        sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
S
Shengliang Guan 已提交
878 879
        goto _error;
      }
S
Shengliang Guan 已提交
880
    } else {
881 882
      sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
883
    }
M
Minghao Li 已提交
884 885
  }

M
Minghao Li 已提交
886
  // init by SSyncInfo
M
Minghao Li 已提交
887
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
888
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
889
  sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
S
Shengliang Guan 已提交
890 891
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
S
Shengliang Guan 已提交
892
    (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
893 894
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
895 896
  }

M
Minghao Li 已提交
897

M
Minghao Li 已提交
898
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
899
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
900 901 902
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
903

B
Benguang Zhao 已提交
904 905 906
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
907
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
908 909 910
    goto _error;
  }

M
Minghao Li 已提交
911
  // init internal
912
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
913
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
914
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
915
    goto _error;
916
  }
M
Minghao Li 已提交
917

M
Minghao Li 已提交
918
  // init peersNum, peers, peersId
919
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.replicaNum - 1;
S
Shengliang Guan 已提交
920
  int32_t j = 0;
921 922 923 924
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
925 926 927
      j++;
    }
  }
S
Shengliang Guan 已提交
928
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
929
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
930
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
931
      goto _error;
932
    }
M
Minghao Li 已提交
933
  }
M
Minghao Li 已提交
934

M
Minghao Li 已提交
935
  // init replicaNum, replicasId
936 937 938
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
939
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
940
      goto _error;
941
    }
M
Minghao Li 已提交
942 943
  }

M
Minghao Li 已提交
944
  // init raft algorithm
M
Minghao Li 已提交
945
  pSyncNode->pFsm = pSyncInfo->pFsm;
946
  pSyncInfo->pFsm = NULL;
947
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
M
Minghao Li 已提交
948 949
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
950
  // init life cycle outside
M
Minghao Li 已提交
951

M
Minghao Li 已提交
952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
976
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
977
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
978
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
979
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
980
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
981 982
    goto _error;
  }
M
Minghao Li 已提交
983

M
Minghao Li 已提交
984
  // init TLA+ candidate vars
M
Minghao Li 已提交
985
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
986
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
987
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
988 989
    goto _error;
  }
M
Minghao Li 已提交
990
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
991
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
992
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
993 994
    goto _error;
  }
M
Minghao Li 已提交
995

M
Minghao Li 已提交
996 997
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
998
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
999
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1000 1001
    goto _error;
  }
M
Minghao Li 已提交
1002
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
1003
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
1004
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1005 1006
    goto _error;
  }
M
Minghao Li 已提交
1007 1008 1009

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
1010
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
1011
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
1012 1013
    goto _error;
  }
1014 1015 1016 1017

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
1018
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1019 1020
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
1021
      sNTrace(pSyncNode, "reset commit index by snapshot");
1022 1023 1024
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
1025

1026 1027 1028
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
    goto _error;
  }
M
Minghao Li 已提交
1029 1030
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
1031 1032
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
1033

M
Minghao Li 已提交
1034
  // init ping timer
M
Minghao Li 已提交
1035
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
1036
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
1037 1038
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
1039
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
1040
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
1041

M
Minghao Li 已提交
1042 1043
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
1044
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1045
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
1046
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
1047 1048 1049 1050
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
1051
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
1052 1053
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
1054
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
1055 1056
  pSyncNode->heartbeatTimerCounter = 0;

1057 1058 1059 1060 1061
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
1062
  // tools
M
Minghao Li 已提交
1063
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
1064
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
1065
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
1066 1067
    goto _error;
  }
M
Minghao Li 已提交
1068

1069 1070
  // restore state
  pSyncNode->restoreFinish = false;
1071

M
Minghao Li 已提交
1072
  // snapshot senders
S
Shengliang Guan 已提交
1073
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1074
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
1075 1076 1077 1078
    if (pSender == NULL) return NULL;

    pSyncNode->senders[i] = pSender;
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
M
Minghao Li 已提交
1079 1080 1081
  }

  // snapshot receivers
1082
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
1083 1084 1085
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
          pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1086

M
Minghao Li 已提交
1087 1088 1089
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
1090 1091 1092
  // replication mgr
  syncNodeLogReplMgrInit(pSyncNode);

M
Minghao Li 已提交
1093 1094 1095
  // peer state
  syncNodePeerStateInit(pSyncNode);

B
Benguang Zhao 已提交
1096
  //
M
Minghao Li 已提交
1097 1098 1099
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1100
  // start in syncNodeStart
M
Minghao Li 已提交
1101
  // start raft
M
Minghao Li 已提交
1102
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1103

M
Minghao Li 已提交
1104 1105
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1106
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1107 1108
  pSyncNode->lastReplicateTime = timeNow;

1109 1110 1111
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
1112 1113
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
1114
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1115
    goto _error;
B
Benguang Zhao 已提交
1116 1117
  }

1118
  pSyncNode->isStart = true;
1119 1120 1121
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1122 1123
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1124
  pSyncNode->tmrRoutineNum = 0;
1125

1126 1127
  sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
M
Minghao Li 已提交
1128
  return pSyncNode;
1129 1130 1131

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1132 1133
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1134 1135 1136 1137
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1138 1139
}

M
Minghao Li 已提交
1140 1141
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1142 1143
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1144 1145 1146 1147 1148 1149
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1150
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1151 1152
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1153 1154 1155 1156

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1157 1158 1159 1160 1161 1162
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex: %" PRId64 ", lastVer: %" PRId64 "",
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1163

1164
  ASSERT(endIndex == lastVer + 1);
B
Benguang Zhao 已提交
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187
  commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);

  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) {
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
  if (pSyncNode->replicaNum == 1) {
    raftStoreNextTerm(pSyncNode->pRaftStore);
    syncNodeBecomeLeader(pSyncNode, "one replica start");

    // Raft 3.6.2 Committing entries from previous terms
    syncNodeAppendNoop(pSyncNode);
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1188
  ASSERT(ret == 0);
B
Benguang Zhao 已提交
1189 1190 1191 1192
  return ret;
}

void syncNodeStartOld(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1193
  // start raft
1194
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
1195
    raftStoreNextTerm(pSyncNode->pRaftStore);
1196
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
1197

1198
    // Raft 3.6.2 Committing entries from previous terms
1199 1200
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
1201

M
Minghao Li 已提交
1202 1203
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
1204 1205
  }

1206 1207
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1208
  ASSERT(ret == 0);
M
Minghao Li 已提交
1209 1210
}

B
Benguang Zhao 已提交
1211
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1212 1213 1214 1215 1216 1217 1218
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1219
  ASSERT(ret == 0);
1220

1221 1222
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1223
  ASSERT(ret == 0);
B
Benguang Zhao 已提交
1224
  return ret;
M
Minghao Li 已提交
1225 1226
}

M
Minghao Li 已提交
1227
void syncNodePreClose(SSyncNode* pSyncNode) {
1228 1229 1230 1231
  if (pSyncNode != NULL && pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpApplyQueueItems != NULL) {
    while (1) {
      int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
      sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
1232
      if (aqItems == 0 || aqItems == -1) {
1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243
        break;
      }
      taosMsleep(20);
    }
  }

  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
      snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
    }

1244 1245
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
1246 1247 1248 1249
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

M
Minghao Li 已提交
1250 1251 1252 1253 1254 1255 1256
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

1257
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1258

M
Minghao Li 已提交
1259
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1260
  if (pSyncNode == NULL) return;
1261
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1262

S
Shengliang Guan 已提交
1263
  int32_t ret = raftStoreClose(pSyncNode->pRaftStore);
1264
  ASSERT(ret == 0);
M
Minghao Li 已提交
1265
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
1266

B
Benguang Zhao 已提交
1267
  syncNodeLogReplMgrDestroy(pSyncNode);
M
Minghao Li 已提交
1268
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1269
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1270
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1271
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1272
  votesRespondDestory(pSyncNode->pVotesRespond);
1273
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1274
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1275
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1276
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1277
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1278
  logStoreDestory(pSyncNode->pLogStore);
1279
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1280 1281
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1282 1283 1284 1285 1286

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

S
Shengliang Guan 已提交
1287
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1288 1289
    if (pSyncNode->senders[i] != NULL) {
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
1290

1291 1292
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
        snapshotSenderStop(pSyncNode->senders[i], false);
1293 1294
      }

1295 1296
      snapshotSenderDestroy(pSyncNode->senders[i]);
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1297 1298 1299
    }
  }

M
Minghao Li 已提交
1300
  if (pSyncNode->pNewNodeReceiver != NULL) {
1301 1302 1303 1304
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
      snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
    }

1305
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1306 1307 1308 1309
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1310 1311 1312 1313
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

1314
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1315 1316
}

1317
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
M
Minghao Li 已提交
1318

M
Minghao Li 已提交
1319 1320 1321
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1322 1323
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1324 1325 1326
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1327
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1328
  }
M
Minghao Li 已提交
1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1342
  if (syncIsInit()) {
1343
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1344

1345 1346 1347 1348 1349
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1350

M
Minghao Li 已提交
1351
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1352
                 &pSyncNode->pElectTimer);
1353

1354
  } else {
M
Minghao Li 已提交
1355
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1356
  }
M
Minghao Li 已提交
1357 1358 1359 1360 1361
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1362
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1363 1364
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1365

M
Minghao Li 已提交
1366 1367 1368 1369 1370 1371 1372 1373 1374 1375
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

1376
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1377 1378
  int32_t electMS;

1379
  if (pSyncNode->raftCfg.isStandBy) {
M
Minghao Li 已提交
1380 1381 1382 1383
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
1384 1385

  (void)syncNodeRestartElectTimer(pSyncNode, electMS);
1386

S
Shengliang Guan 已提交
1387 1388
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1389 1390
}

M
Minghao Li 已提交
1391
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1392
  int32_t ret = 0;
S
Shengliang Guan 已提交
1393 1394
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1395 1396 1397
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1398
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1399
  }
1400

S
Shengliang Guan 已提交
1401
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1402 1403 1404
  return ret;
}

M
Minghao Li 已提交
1405
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1406
  int32_t ret = 0;
M
Minghao Li 已提交
1407

1408
#if 0
M
Minghao Li 已提交
1409
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1410 1411
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1412

S
Shengliang Guan 已提交
1413
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1414
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1415 1416 1417
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1418
  }
1419

M
Minghao Li 已提交
1420 1421 1422
  return ret;
}

M
Minghao Li 已提交
1423 1424
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1425 1426

#if 0
M
Minghao Li 已提交
1427 1428 1429
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1430
#endif
1431

S
Shengliang Guan 已提交
1432
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1433
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1434 1435 1436
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1437
  }
1438

M
Minghao Li 已提交
1439 1440 1441
  return ret;
}

1442 1443 1444 1445 1446 1447
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

1448 1449 1450 1451 1452 1453 1454 1455
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
  SEpSet* epSet = NULL;
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
    if (destRaftId->addr == pNode->peersId[i].addr) {
      epSet = &pNode->peersEpset[i];
      break;
    }
  }
1456

1457
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
M
Minghao Li 已提交
1458
    syncUtilMsgHtoN(pMsg->pCont);
1459
    pMsg->info.noResp = 1;
1460
    return pNode->syncSendMSg(epSet, pMsg);
M
Minghao Li 已提交
1461
  } else {
1462
    sError("vgId:%d, sync send msg by id error, fp:%p epset:%p", pNode->vgId, pNode->syncSendMSg, epSet);
S
Shengliang Guan 已提交
1463
    rpcFreeCont(pMsg->pCont);
1464
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
1465
    return -1;
M
Minghao Li 已提交
1466
  }
M
Minghao Li 已提交
1467 1468
}

1469
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
1470 1471 1472
  bool b1 = false;
  bool b2 = false;

1473 1474 1475
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
1476 1477 1478 1479 1480
      b1 = true;
      break;
    }
  }

1481 1482 1483 1484 1485
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
        .vgId = pNode->vgId,
    };
1486

1487
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
1488 1489 1490 1491 1492
      b2 = true;
      break;
    }
  }

1493
  ASSERT(b1 == b2);
1494 1495 1496
  return b1;
}

1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1510
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1511
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1512 1513 1514 1515
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1516

1517 1518
  pSyncNode->raftCfg.cfg = *pNewConfig;
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
1519

1520 1521
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1522 1523
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1524

M
Minghao Li 已提交
1525 1526
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1527

M
Minghao Li 已提交
1528 1529 1530 1531
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1532
  }
1533

M
Minghao Li 已提交
1534 1535 1536 1537 1538
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1539

M
Minghao Li 已提交
1540
  // log begin config change
1541 1542
  sNInfo(pSyncNode, "begin do config change, from %d to %d", pSyncNode->vgId, oldConfig.replicaNum,
         pNewConfig->replicaNum);
M
Minghao Li 已提交
1543

M
Minghao Li 已提交
1544
  if (IamInNew) {
1545
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1546
  }
M
Minghao Li 已提交
1547
  if (isDrop) {
1548
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
M
Minghao Li 已提交
1549 1550
  }

M
Minghao Li 已提交
1551
  // add last config index
1552
  syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
M
Minghao Li 已提交
1553

M
Minghao Li 已提交
1554 1555 1556 1557 1558 1559 1560 1561 1562
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1563
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1564
      oldSenders[i] = pSyncNode->senders[i];
S
Shengliang Guan 已提交
1565
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1566
    }
1567

M
Minghao Li 已提交
1568
    // init internal
1569
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
1570
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1571 1572

    // init peersNum, peers, peersId
1573
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1574
    int32_t j = 0;
1575 1576 1577 1578
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
1579 1580 1581
        j++;
      }
    }
S
Shengliang Guan 已提交
1582
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1583
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1584
    }
1585

M
Minghao Li 已提交
1586
    // init replicaNum, replicasId
1587 1588 1589
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
      syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1590
    }
1591

1592
    // update quorum first
1593
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
1594

M
Minghao Li 已提交
1595 1596 1597 1598
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1599

M
Minghao Li 已提交
1600
    // reset snapshot senders
1601

M
Minghao Li 已提交
1602
    // clear new
S
Shengliang Guan 已提交
1603
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1604
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1605
    }
M
Minghao Li 已提交
1606

M
Minghao Li 已提交
1607
    // reset new
S
Shengliang Guan 已提交
1608
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1609 1610
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1611
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1612
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
1613 1614
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
M
Minghao Li 已提交
1615

1616
          pSyncNode->senders[i] = oldSenders[j];
M
Minghao Li 已提交
1617 1618 1619 1620
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
1621 1622
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
          pSyncNode->senders[i]->replicaIndex = i;
M
Minghao Li 已提交
1623

1624 1625
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
M
Minghao Li 已提交
1626 1627

          break;
M
Minghao Li 已提交
1628
        }
1629 1630
      }
    }
1631

M
Minghao Li 已提交
1632
    // create new
S
Shengliang Guan 已提交
1633
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1634 1635 1636 1637 1638 1639 1640 1641
      if (pSyncNode->senders[i] == NULL) {
        pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
        if (pSyncNode->senders[i] == NULL) {
          // will be created later while send snapshot
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
        } else {
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
        }
S
Shengliang Guan 已提交
1642
      } else {
1643
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
M
Minghao Li 已提交
1644
      }
1645 1646
    }

M
Minghao Li 已提交
1647
    // free old
S
Shengliang Guan 已提交
1648
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1649
      if (oldSenders[i] != NULL) {
1650
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1651 1652 1653
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1654 1655
    }

1656
    // persist cfg
1657
    syncWriteCfgFile(pSyncNode);
1658

M
Minghao Li 已提交
1659

M
Minghao Li 已提交
1660 1661
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
1662
      syncNodeBecomeLeader(pSyncNode, "");
1663 1664 1665

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1666
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1667

M
Minghao Li 已提交
1668
    } else {
1669
      syncNodeBecomeFollower(pSyncNode, "");
M
Minghao Li 已提交
1670 1671
    }
  } else {
1672
    // persist cfg
1673 1674
    syncWriteCfgFile(pSyncNode);
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.replicaNum, pNewConfig->replicaNum);
1675
  }
1676

M
Minghao Li 已提交
1677
_END:
M
Minghao Li 已提交
1678
  // log end config change
1679 1680
  sNInfo(pSyncNode, "end do config change, from %d to %d", pSyncNode->vgId, oldConfig.replicaNum,
         pNewConfig->replicaNum);
M
Minghao Li 已提交
1681 1682
}

M
Minghao Li 已提交
1683 1684 1685 1686
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1687
    char tmpBuf[64];
1688
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1689
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1690 1691 1692 1693
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1694 1695 1696 1697 1698 1699
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1700
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1701
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1702
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1703
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1704 1705
    return;
  }
M
Minghao Li 已提交
1706 1707

  do {
1708
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1709
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1710 1711 1712 1713 1714
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1715
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1716 1717 1718 1719 1720 1721 1722 1723 1724 1725
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1726 1727
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1728
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1729
  // maybe clear leader cache
M
Minghao Li 已提交
1730 1731 1732 1733
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1734 1735
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1736
  // state change
M
Minghao Li 已提交
1737 1738 1739
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1740 1741
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1742

1743 1744 1745
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1746 1747 1748 1749 1750
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1751 1752 1753
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1754 1755 1756
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1757
  // trace log
S
Shengliang Guan 已提交
1758
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1779
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1780 1781
  pSyncNode->leaderTime = taosGetTimestampMs();

1782
  pSyncNode->becomeLeaderNum++;
1783
  pSyncNode->hbrSlowNum = 0;
1784

1785 1786 1787
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1788
  // state change
M
Minghao Li 已提交
1789
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1790 1791

  // set leader cache
M
Minghao Li 已提交
1792 1793
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1794
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1795 1796
    // maybe overwrite myself, no harm
    // just do it!
1797 1798 1799 1800 1801 1802 1803

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1804
    ASSERT(code == 0);
1805
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1806 1807
  }

S
Shengliang Guan 已提交
1808
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1809 1810
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1811 1812 1813
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1814 1815 1816
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1817
#if 0
1818 1819
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1820
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1821
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1822 1823
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
1824
      }
1825
    }
1826
    (pMySender->privateTerm) += 100;
1827
  }
M
Minghao Li 已提交
1828
#endif
1829

1830
  // close receiver
M
Minghao Li 已提交
1831 1832
  if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
      snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
1833 1834 1835
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1836
  // stop elect timer
M
Minghao Li 已提交
1837
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1838

M
Minghao Li 已提交
1839 1840
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1841

M
Minghao Li 已提交
1842 1843
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1844

1845 1846 1847 1848 1849
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1850 1851 1852
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1853 1854 1855
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1856
  // trace log
1857
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1858 1859 1860
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1861 1862
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1863
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1864

S
Shengliang Guan 已提交
1865
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1866

B
Benguang Zhao 已提交
1867
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1868 1869 1870 1871
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1872
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1873
  ASSERT(lastIndex >= 0);
B
Benguang Zhao 已提交
1874 1875
  sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "",
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1876 1877 1878
}

void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) {
1879 1880
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
B
Benguang Zhao 已提交
1881 1882
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");

M
Minghao Li 已提交
1883
  // Raft 3.6.2 Committing entries from previous terms
1884 1885
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1886 1887

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1888
    syncNodeReplicate(pSyncNode);
1889
  }
M
Minghao Li 已提交
1890 1891
}

M
Minghao Li 已提交
1892 1893
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1894
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1895
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1896 1897 1898 1899 1900
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1901 1902 1903
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1904
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1905
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1906 1907 1908
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1909

S
Shengliang Guan 已提交
1910
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1911 1912 1913
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1914
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1915
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1916 1917 1918 1919
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);

S
Shengliang Guan 已提交
1920
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1921 1922 1923
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1924
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1925
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1926 1927 1928 1929
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);

S
Shengliang Guan 已提交
1930
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1931 1932
}

M
Minghao Li 已提交
1933 1934
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1935
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1936 1937
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1938 1939 1940 1941

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1942
// simulate get vote from outside
M
Minghao Li 已提交
1943
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1944
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1945

S
Shengliang Guan 已提交
1946 1947
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1948
  if (ret != 0) return;
M
Minghao Li 已提交
1949

S
Shengliang Guan 已提交
1950
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1951 1952 1953 1954 1955 1956 1957
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1958
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1959 1960
}

M
Minghao Li 已提交
1961
// return if has a snapshot
M
Minghao Li 已提交
1962 1963
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1964
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1965 1966
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1967 1968 1969 1970 1971 1972 1973
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1974 1975
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1976
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1977
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1978 1979
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1980 1981 1982 1983 1984 1985 1986
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1987 1988
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1989 1990
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1991 1992
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1993
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1994 1995
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1996 1997
    }

M
Minghao Li 已提交
1998 1999 2000
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
2001 2002 2003 2004
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
2005
  } else {
M
Minghao Li 已提交
2006 2007
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
2008
  }
M
Minghao Li 已提交
2009

M
Minghao Li 已提交
2010 2011 2012 2013 2014 2015 2016
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
2017 2018
  return 0;
}
M
Minghao Li 已提交
2019

M
Minghao Li 已提交
2020
// return append-entries first try index
M
Minghao Li 已提交
2021 2022 2023 2024 2025
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
2026 2027
// if index > 0, return index - 1
// else, return -1
2028 2029 2030 2031 2032 2033 2034 2035 2036
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
2037 2038 2039 2040
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
2041 2042 2043 2044 2045 2046 2047 2048 2049
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

2050 2051 2052
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

2053
  SSyncRaftEntry* pPreEntry = NULL;
2054 2055 2056 2057 2058 2059 2060
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

2061
    pSyncNode->pLogStore->cacheHit++;
2062 2063 2064
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
2065
    pSyncNode->pLogStore->cacheMiss++;
2066 2067 2068 2069
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
2070 2071 2072 2073 2074 2075

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2076
  if (code == 0) {
2077
    ASSERT(pPreEntry != NULL);
2078
    preTerm = pPreEntry->term;
2079 2080 2081 2082

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
2083
      syncEntryDestroy(pPreEntry);
2084 2085
    }

2086 2087
    return preTerm;
  } else {
2088 2089 2090 2091
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2092 2093 2094 2095
      }
    }
  }

2096
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
2097
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2098 2099
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2100 2101 2102 2103

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2104
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2105 2106 2107
  return 0;
}

M
Minghao Li 已提交
2108
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2109
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2110

S
Shengliang Guan 已提交
2111 2112 2113
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2114
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
2115 2116
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
2117
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
2118 2119
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
2120
    }
M
Minghao Li 已提交
2121

M
Minghao Li 已提交
2122
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
2123 2124
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
2125
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
2126 2127
      rpcFreeCont(rpcMsg.pCont);
      return;
2128
    }
M
Minghao Li 已提交
2129

S
Shengliang Guan 已提交
2130
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
2131
  }
M
Minghao Li 已提交
2132 2133
}

M
Minghao Li 已提交
2134
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2135
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2136

M
Minghao Li 已提交
2137 2138
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2139

2140
  if (pNode == NULL) return;
M
Minghao Li 已提交
2141 2142 2143 2144 2145

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2146

2147
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2148 2149 2150 2151
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2152

S
Shengliang Guan 已提交
2153
  SRpcMsg rpcMsg = {0};
2154 2155
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2156

S
Shengliang Guan 已提交
2157
  if (code != 0) {
M
Minghao Li 已提交
2158
    sError("failed to build elect msg");
M
Minghao Li 已提交
2159
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2160
    return;
M
Minghao Li 已提交
2161 2162
  }

S
Shengliang Guan 已提交
2163
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2164
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2165 2166 2167

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2168
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2169
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2170
    syncNodeRelease(pNode);
2171
    return;
M
Minghao Li 已提交
2172
  }
M
Minghao Li 已提交
2173 2174

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2175 2176
}

M
Minghao Li 已提交
2177
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2178
  if (!syncIsInit()) return;
2179

S
Shengliang Guan 已提交
2180 2181 2182 2183
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2184
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2185 2186 2187
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2188
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2189
        return;
2190
      }
M
Minghao Li 已提交
2191

2192
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2193 2194
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2195
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2196 2197
        rpcFreeCont(rpcMsg.pCont);
        return;
2198
      }
S
Shengliang Guan 已提交
2199 2200 2201 2202

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2203
    } else {
S
Shengliang Guan 已提交
2204 2205
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2206
    }
M
Minghao Li 已提交
2207 2208 2209
  }
}

2210
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2211
  int64_t hbDataRid = (int64_t)param;
2212
  int64_t tsNow = taosGetTimestampMs();
2213

2214 2215
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
M
Minghao Li 已提交
2216
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2217 2218
    return;
  }
2219

2220
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2221
  if (pSyncNode == NULL) {
2222
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2223
    sError("hb timer get pSyncNode NULL");
2224 2225 2226 2227 2228 2229 2230 2231
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2232
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2233 2234 2235
    return;
  }

M
Minghao Li 已提交
2236
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2237 2238
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2239
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2240 2241 2242
    return;
  }

M
Minghao Li 已提交
2243
  if (pSyncNode->pRaftStore == NULL) {
2244 2245
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2246
    sError("vgId:%d, hb timer raft store already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2247 2248 2249
    return;
  }

M
Minghao Li 已提交
2250
  // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
2251 2252

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2253 2254 2255
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2256
    if (timerLogicClock == msgLogicClock) {
2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276
      if (tsNow > pData->execTime) {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
            "---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif

        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
        pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
        pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
        pSyncMsg->privateTerm = 0;
2277
        pSyncMsg->timeStamp = tsNow;
2278 2279 2280 2281 2282 2283

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
2284 2285
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2286 2287 2288 2289 2290 2291 2292 2293
      } else {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
      }

M
Minghao Li 已提交
2294 2295
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2296 2297
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2298 2299 2300 2301
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2302
    } else {
M
Minghao Li 已提交
2303 2304
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2305 2306
    }
  }
2307 2308 2309

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2310 2311
}

2312 2313 2314 2315 2316
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
2317

2318 2319 2320 2321
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
2322

S
Shengliang Guan 已提交
2323
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2324
  int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
2325
  syncEntryDestroy(pEntry);
M
Minghao Li 已提交
2326

2327 2328 2329
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2330
    sError("failed to propose noop msg while enqueue since %s", terrstr());
2331
  }
M
Minghao Li 已提交
2332

2333
  return code;
M
Minghao Li 已提交
2334 2335
}

2336 2337
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2338 2339 2340 2341
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2342 2343
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2344 2345 2346 2347 2348 2349 2350 2351 2352
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2353
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
2354 2355 2356 2357 2358 2359 2360
  if (pEntry->dataLen < sizeof(SMsgHead)) {
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
    syncEntryDestroy(pEntry);
    return -1;
  }

B
Benguang Zhao 已提交
2361 2362
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2363 2364 2365 2366 2367
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
    terrno = TSDB_CODE_SYN_BUFFER_FULL;
    (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->pRaftStore->currentTerm, pEntry,
                            TSDB_CODE_SYN_BUFFER_FULL);
    syncEntryDestroy(pEntry);
B
Benguang Zhao 已提交
2368 2369 2370 2371
    return -1;
  }

  // proceed match index, with replicating on needed
2372
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2373

2374
  sTrace("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2375 2376 2377
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2378

B
Benguang Zhao 已提交
2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2395
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407
  if (pSyncNode->replicaNum == 1) {
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2408
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2409 2410 2411 2412 2413 2414 2415 2416 2417
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2437
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2438 2439 2440 2441 2442 2443 2444 2445 2446
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
  SyncTerm  term = ths->pRaftStore->currentTerm;

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2447 2448
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2449 2450 2451
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2452 2453
  int32_t ret = 0;

2454
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2455
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2456
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2457
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2458

2459 2460
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2461
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2462
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2463
    if (code != 0) {
M
Minghao Li 已提交
2464
      sError("append noop error");
2465 2466
      return -1;
    }
2467 2468

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2469 2470
  }

2471 2472 2473
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2474
    syncEntryDestroy(pEntry);
2475 2476
  }

M
Minghao Li 已提交
2477 2478 2479
  return ret;
}

S
Shengliang Guan 已提交
2480 2481
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2482

M
Minghao Li 已提交
2483 2484 2485 2486
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2487
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2488
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2489
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2490

2491 2492 2493 2494
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2495 2496 2497 2498
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number
2499
  pMsgReply->startTime = ths->startTime;
2500
  pMsgReply->timeStamp = tsMs;
2501

M
Minghao Li 已提交
2502
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2503 2504
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);

2505
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2506
    ths->minMatchIndex = pMsg->minMatchIndex;
2507 2508

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2509
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
S
Shengliang Guan 已提交
2510 2511 2512 2513
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2514 2515
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;
2516
      SyncIndex fcIndex = pSyncMsg->fcIndex;
2517 2518 2519 2520 2521 2522 2523

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2524
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2525 2526
        }
      }
2527 2528 2529
    }
  }

M
Minghao Li 已提交
2530
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2531
    // syncNodeStepDown(ths, pMsg->term);
S
Shengliang Guan 已提交
2532 2533 2534 2535
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2536 2537 2538
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

S
Shengliang Guan 已提交
2539 2540
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2541 2542 2543 2544
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2545
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2546
      }
2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561
    }
  }

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
  return 0;
}

2562
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2563 2564 2565 2566
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2567
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2568
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2569 2570 2571 2572
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2573 2574

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2575
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2576

2577 2578
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2579 2580 2581
  return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg);
}

2582
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2583
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2584

M
Minghao Li 已提交
2585 2586 2587 2588
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2589
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2590
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2591
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2592

2593
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2594
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2595 2596 2597
  return 0;
}

S
Shengliang Guan 已提交
2598 2599
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2600 2601
  syncLogRecvLocalCmd(ths, pMsg, "");

2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    (void)syncNodeUpdateCommitIndex(ths, pMsg->fcIndex);
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
      sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(),
             ths->commitIndex);
    }
  } else {
    sError("error local cmd");
  }

  return 0;
}

int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2622 2623 2624
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2625 2626 2627
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2628
  } else {
M
Minghao Li 已提交
2629
    sError("error local cmd");
M
Minghao Li 已提交
2630
  }
2631 2632 2633 2634

  return 0;
}

M
Minghao Li 已提交
2635 2636 2637 2638 2639 2640 2641 2642 2643 2644
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2645

2646
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2647
  sNTrace(ths, "on client request");
2648

B
Benguang Zhao 已提交
2649 2650
  int32_t code = 0;

B
Benguang Zhao 已提交
2651 2652 2653
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = NULL;
2654 2655 2656 2657
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2658 2659
  }

2660 2661 2662 2663 2664
  if (pEntry == NULL) {
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
    return -1;
  }

B
Benguang Zhao 已提交
2665 2666 2667 2668 2669
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2670
    int32_t code = syncNodeAppend(ths, pEntry);
2671 2672
    if (code < 0) {
      sNError(ths, "failed to append blocking msg");
2673 2674
    }
    return code;
2675 2676 2677
  } else {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
B
Benguang Zhao 已提交
2678 2679
  }

B
Benguang Zhao 已提交
2680
  return -1;
B
Benguang Zhao 已提交
2681 2682
}

2683 2684
int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
  sNTrace(ths, "on client request");
B
Benguang Zhao 已提交
2685

M
Minghao Li 已提交
2686
  int32_t ret = 0;
2687
  int32_t code = 0;
M
Minghao Li 已提交
2688

M
Minghao Li 已提交
2689
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2690
  SyncTerm        term = ths->pRaftStore->currentTerm;
2691 2692 2693 2694 2695 2696 2697
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2698

2699 2700
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2701
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2702 2703 2704
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2705 2706 2707 2708
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
2709
          syncEntryDestroy(pEntry);
2710
        }
2711

2712 2713 2714 2715
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2727
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2728 2729 2730 2731

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
2732
          syncEntryDestroy(pEntry);
2733 2734
        }

2735 2736
        return -1;
      }
2737
    }
M
Minghao Li 已提交
2738

2739 2740
    syncCacheEntry(ths->pLogStore, pEntry, &h);

2741 2742
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2743
      syncNodeReplicate(ths);
2744
    }
2745

2746 2747
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2748 2749 2750 2751 2752
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2753
    }
M
Minghao Li 已提交
2754 2755
  }

2756 2757 2758 2759 2760 2761 2762 2763
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2764 2765 2766
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2767
    syncEntryDestroy(pEntry);
2768 2769
  }

M
Minghao Li 已提交
2770
  return ret;
2771
}
M
Minghao Li 已提交
2772

S
Shengliang Guan 已提交
2773 2774 2775
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2776
      return "follower";
S
Shengliang Guan 已提交
2777
    case TAOS_SYNC_STATE_CANDIDATE:
2778
      return "candidate";
S
Shengliang Guan 已提交
2779
    case TAOS_SYNC_STATE_LEADER:
2780
      return "leader";
S
Shengliang Guan 已提交
2781
    case TAOS_SYNC_STATE_ERROR:
2782
      return "error";
S
Shengliang Guan 已提交
2783 2784 2785 2786
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
    default:
      return "unknown";
S
Shengliang Guan 已提交
2787
  }
M
Minghao Li 已提交
2788
}
2789

2790
#if 0
2791
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2792
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2793
    sNTrace(ths, "I am not follower, can not do leader transfer");
2794 2795
    return 0;
  }
2796 2797

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2798
    sNTrace(ths, "restore not finish, can not do leader transfer");
2799 2800 2801
    return 0;
  }

2802
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2803
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2804 2805 2806 2807
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2808
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2809 2810 2811
    return 0;
  }

2812 2813
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2814
      sNTrace(ths, "I am vnode, can not do leader transfer");
2815 2816 2817 2818
      return 0;
    }
  */

2819
  SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
S
Shengliang Guan 已提交
2820
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2821

M
Minghao Li 已提交
2822 2823 2824
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2825

M
Minghao Li 已提交
2826 2827
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2828 2829 2830
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
2831
    ASSERT(ret == 0);
M
Minghao Li 已提交
2832

2833
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2834
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2835 2836
  }

M
Minghao Li 已提交
2837
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2838
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2839 2840 2841 2842 2843 2844 2845 2846 2847
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2848 2849
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2850 2851
  }

2852 2853 2854
  return 0;
}

2855 2856
#endif

2857
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2858
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2859 2860 2861 2862
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
        .vgId = ths->vgId,
    };
2863 2864 2865 2866 2867 2868 2869 2870 2871 2872

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2873 2874 2875 2876
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2877
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2878
  ASSERT(false);
2879 2880 2881 2882
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2883 2884 2885 2886 2887 2888 2889 2890 2891
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2892
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2893

M
Minghao Li 已提交
2894 2895 2896
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2897 2898
  }

2899 2900
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2901

S
Shengliang Guan 已提交
2902
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2903 2904 2905 2906 2907 2908

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2909 2910 2911 2912
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2913

2914
          ths->pLogStore->cacheHit++;
2915 2916
          sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry);

2917
        } else {
2918
          ths->pLogStore->cacheMiss++;
2919 2920
          sNTrace(ths, "miss cache index:%" PRId64, i);

2921
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
2922 2923
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2924
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2925
            sNError(ths, "get log entry error");
2926
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2927 2928
            continue;
          }
2929
        }
2930

2931
        SRpcMsg rpcMsg = {0};
2932 2933
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2934
        sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType));
M
Minghao Li 已提交
2935

2936
        // user commit
2937 2938
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2939
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2940 2941 2942
            internalExecute = false;
          }

M
Minghao Li 已提交
2943 2944
          sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute,
                  TMSG_INFO(pEntry->msgType));
2945

2946 2947
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2960
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2961
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
M
Minghao Li 已提交
2962
          }
2963
        }
2964

2965 2966
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2967
        // leader transfer
2968 2969
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
2970
          ASSERT(code == 0);
2971
        }
2972
#endif
2973 2974

        // restore finish
2975
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
2976 2977 2978 2979 2980 2981
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
2982

2983
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
2984
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
2985 2986 2987 2988
          }
        }

        rpcFreeCont(rpcMsg.pCont);
2989 2990 2991
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
B
Benguang Zhao 已提交
2992
          syncEntryDestroy(pEntry);
2993
        }
2994 2995 2996 2997
      }
    }
  }
  return 0;
2998 2999 3000
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
3001
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
3002 3003 3004 3005 3006
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
3007 3008 3009 3010
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
3011
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
3012 3013 3014 3015 3016
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
3017
}
M
Minghao Li 已提交
3018

3019 3020
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
3021
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
3022 3023 3024 3025 3026 3027 3028
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
3029 3030
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
3031
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
3032 3033 3034 3035 3036 3037 3038 3039 3040
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
3041
  if (pState == NULL) {
3042
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
3043 3044
    return false;
  }
M
Minghao Li 已提交
3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
3070
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
3071
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
3072
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
3073 3074 3075 3076 3077 3078
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
3079
}