syncMain.c 98.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
M
Minghao Li 已提交
40

M
Minghao Li 已提交
41 42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
48 49 50
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58 59 60 61
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
64
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
65
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
66
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
67 68
    return -1;
  }
M
Minghao Li 已提交
69

S
Shengliang Guan 已提交
70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
71
  if (pSyncNode->rid < 0) {
72
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
73 74 75
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79 80 81
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
82
  return pSyncNode->rid;
M
Minghao Li 已提交
83
}
M
Minghao Li 已提交
84

B
Benguang Zhao 已提交
85
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
86
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
87
  if (pSyncNode == NULL) {
B
Benguang Zhao 已提交
88 89 90 91 92
    sError("failed to acquire rid: %" PRId64 " of tsNodeReftId for pSyncNode", rid);
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
93
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
94
    goto _err;
M
Minghao Li 已提交
95
  }
M
Minghao Li 已提交
96

B
Benguang Zhao 已提交
97 98 99 100
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
101

B
Benguang Zhao 已提交
102 103
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
104

105 106 107
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
108 109
}

M
Minghao Li 已提交
110
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
111
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
112
  if (pSyncNode != NULL) {
113
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
114
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
115
    syncNodeRemove(rid);
M
Minghao Li 已提交
116 117 118
  }
}

M
Minghao Li 已提交
119 120
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
121 122 123
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
124 125 126
  }
}

S
Shengliang Guan 已提交
127 128 129
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
130 131
}

S
Shengliang Guan 已提交
132
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
133
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
134
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
135

M
Minghao Li 已提交
136
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
137
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
138
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
139
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
140
    return -1;
M
Minghao Li 已提交
141
  }
142

S
Shengliang Guan 已提交
143 144
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
145

M
Minghao Li 已提交
146 147 148 149
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
150
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
151 152 153
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
154
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
155
  }
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
158
  return 0;
M
Minghao Li 已提交
159
}
M
Minghao Li 已提交
160

S
Shengliang Guan 已提交
161 162 163 164
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
165
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
166 167
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
      code = syncNodeOnSnapshotReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
    default:
      sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg,
             TMSG_INFO(pMsg->msgType));
      code = -1;
M
Minghao Li 已提交
206 207
  }

S
Shengliang Guan 已提交
208
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
209
  return code;
210 211
}

S
Shengliang Guan 已提交
212
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
213
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
214
  if (pSyncNode == NULL) return -1;
215

S
Shengliang Guan 已提交
216
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
217
  syncNodeRelease(pSyncNode);
218 219 220
  return ret;
}

S
Shengliang Guan 已提交
221 222 223 224 225
void syncSendTimeoutRsp(int64_t rid, int64_t seq) {
  SSyncNode* pNode = syncNodeAcquire(rid);
  if (pNode == NULL) return;

  SRpcMsg rpcMsg = {0};
226
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
227 228 229
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
230 231 232 233 234
  if (ret == 1) {
    sInfo("send response since sync timeout, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle,
          rpcMsg.info.ahandle);
    rpcSendResponse(&rpcMsg);
  }
S
Shengliang Guan 已提交
235 236
}

M
Minghao Li 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

253
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
254
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
255
  if (pSyncNode == NULL) {
256
    sError("sync begin snapshot error");
257 258
    return -1;
  }
259

260 261
  int32_t code = 0;

M
Minghao Li 已提交
262
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
263 264 265
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
266 267 268
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
269 270 271
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
272 273
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
274
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
275 276 277
      return 0;
    }

M
Minghao Li 已提交
278 279 280
    goto _DEL_WAL;

  } else {
281 282 283 284 285 286 287 288 289 290 291 292
    lastApplyIndex -= SYNC_VNODE_LOG_RETENTION;

    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
      syncNodeRelease(pSyncNode);
      return 0;
    }

M
Minghao Li 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
307 308 309 310
              sNTrace(pSyncNode,
                      "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                      " of %s:%d, do not delete wal",
                      lastApplyIndex, matchIndex, host, port);
M
Minghao Li 已提交
311 312
            } while (0);

S
Shengliang Guan 已提交
313
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
314 315 316 317 318 319
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
320 321 322
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
323
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
324 325 326 327
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
328
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
329
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
330 331 332
        return 0;

      } else {
S
Shengliang Guan 已提交
333
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
334
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
335 336 337 338 339 340 341 342 343
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
344 345 346
    }
  }

M
Minghao Li 已提交
347
_DEL_WAL:
348

M
Minghao Li 已提交
349
  do {
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

        code = walBeginSnapshot(pData->pWal, lastApplyIndex);
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
370

M
Minghao Li 已提交
371
      } else {
372 373
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
374
      }
375
    }
M
Minghao Li 已提交
376
  } while (0);
377

S
Shengliang Guan 已提交
378
  syncNodeRelease(pSyncNode);
379 380 381 382
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
383
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
384
  if (pSyncNode == NULL) {
385
    sError("sync end snapshot error");
386 387 388
    return -1;
  }

389 390 391 392
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
393
    if (code != 0) {
394
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
395
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
396 397
      return -1;
    } else {
S
Shengliang Guan 已提交
398
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
399 400
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
401
  }
402

S
Shengliang Guan 已提交
403
  syncNodeRelease(pSyncNode);
404 405 406
  return code;
}

M
Minghao Li 已提交
407
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
408
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
409
  if (pSyncNode == NULL) {
410
    sError("sync step down error");
M
Minghao Li 已提交
411 412 413
    return -1;
  }

M
Minghao Li 已提交
414
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
415
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
416
  return 0;
M
Minghao Li 已提交
417 418
}

419
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
420
  if (pSyncNode == NULL) {
421
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
422
    sError("sync ready for read error");
423 424
    return false;
  }
M
Minghao Li 已提交
425

426 427 428 429 430 431
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

  if (pSyncNode->restoreFinish) {
432
    return true;
M
Minghao Li 已提交
433 434
  }

435
  bool ready = false;
436 437 438
  if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
    // apply queue not empty
    ready = false;
M
Minghao Li 已提交
439

440 441 442 443 444 445 446 447 448 449 450 451 452
  } else {
    if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
      SyncIndex       lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
      SSyncRaftEntry* pEntry = NULL;
      SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
      LRUHandle*      h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
      int32_t         code = 0;
      if (h) {
        pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        code = 0;

        pSyncNode->pLogStore->cacheHit++;
        sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
M
Minghao Li 已提交
453

454 455 456
      } else {
        pSyncNode->pLogStore->cacheMiss++;
        sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
M
Minghao Li 已提交
457

458 459
        code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
      }
460

461 462 463
      if (code == 0 && pEntry != NULL) {
        if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
          ready = true;
464
        }
465

466 467 468 469
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestroy(pEntry);
470
        }
471 472 473 474
      }
    }
  }

475
  if (!ready) {
476
    terrno = TSDB_CODE_SYN_RESTORING;
477
  }
478

479 480 481 482 483 484 485 486 487 488 489 490
  return ready;
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

491 492
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
493
}
M
Minghao Li 已提交
494

495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
517 518
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
519
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
520 521
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
522
  }
M
Minghao Li 已提交
523

524
  int32_t ret = 0;
525
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
526
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
527 528 529 530 531 532 533
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
534 535 536
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
537
  return ret;
M
Minghao Li 已提交
538 539
}

M
Minghao Li 已提交
540 541
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
542
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
543 544
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
545
  }
546

S
Shengliang Guan 已提交
547
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
548

549 550 551 552
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
553 554 555 556
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
557
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
558 559
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
560 561
}

562 563
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
564

S
Shengliang Guan 已提交
565
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
566 567 568
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
569 570 571 572 573
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
574
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
575 576
  }

577
  return state;
M
Minghao Li 已提交
578 579
}

580
#if 0
581 582 583 584 585
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
586
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
587 588 589
  if (pSyncNode == NULL) {
    return -1;
  }
590
  ASSERT(rid == pSyncNode->rid);
591 592 593 594 595

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
B
Benguang Zhao 已提交
596
      syncEntryDestroy(pEntry);
597
    }
S
Shengliang Guan 已提交
598
    syncNodeRelease(pSyncNode);
599 600
    return -1;
  }
601
  ASSERT(pEntry != NULL);
602 603 604 605 606 607

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

608
  syncEntryDestroy(pEntry);
S
Shengliang Guan 已提交
609
  syncNodeRelease(pSyncNode);
610 611 612
  return 0;
}

613
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
614
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
615 616 617
  if (pSyncNode == NULL) {
    return -1;
  }
618
  ASSERT(rid == pSyncNode->rid);
619 620
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
621
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
622

S
Shengliang Guan 已提交
623
  syncNodeRelease(pSyncNode);
624 625 626
  return 0;
}

627
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
628
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
629 630 631
  if (pSyncNode == NULL) {
    return -1;
  }
632
  ASSERT(rid == pSyncNode->rid);
633

634
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
635 636
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
637
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
638 639 640 641 642 643
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
644
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
645
         sMeta->lastConfigIndex);
646

S
Shengliang Guan 已提交
647
  syncNodeRelease(pSyncNode);
648 649
  return 0;
}
650
#endif
651

652
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
653
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
654 655
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
656
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
657 658 659 660 661
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
662
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
663
         snapshotLastApplyIndex, lastIndex);
664 665 666 667

  return lastIndex;
}

668 669
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
670

S
Shengliang Guan 已提交
671
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
672
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
673

S
Shengliang Guan 已提交
674
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
675 676 677 678
    SEp* pEp = &pEpSet->eps[i];
    tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    pEpSet->numOfEps++;
679
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
680
  }
M
Minghao Li 已提交
681 682
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
683 684
  }

S
Shengliang Guan 已提交
685
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
686
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
687 688
}

S
Shengliang Guan 已提交
689
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
690
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
691
  if (pSyncNode == NULL) {
692
    sError("sync propose error");
M
Minghao Li 已提交
693
    return -1;
694
  }
695

S
Shengliang Guan 已提交
696
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
697
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
698 699
  return ret;
}
M
Minghao Li 已提交
700

S
Shengliang Guan 已提交
701
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
702 703 704 705 706
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
    return -1;
  }
707

S
Shengliang Guan 已提交
708 709 710 711 712 713 714
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
715

716
  // heartbeat timeout
717
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
718 719 720 721 722 723
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
724 725 726
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
727
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
728 729 730
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
731 732 733
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
734
    } else {
S
Shengliang Guan 已提交
735
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
736
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
737
             TMSG_INFO(pMsg->msgType));
738
      return -1;
739
    }
S
Shengliang Guan 已提交
740
  } else {
S
Shengliang Guan 已提交
741 742
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
743
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
744
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
745 746 747 748
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
749
    }
750

751 752 753 754 755
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
756
    }
M
Minghao Li 已提交
757

S
Shengliang Guan 已提交
758
    if (seq != NULL) *seq = seqNum;
759
    return code;
M
Minghao Li 已提交
760
  }
M
Minghao Li 已提交
761 762
}

S
Shengliang Guan 已提交
763
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
764 765 766 767 768
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
769
  pSyncTimer->timeStamp = taosGetTimestampMs();
770 771 772 773
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
774
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
775
  int32_t ret = 0;
S
Shengliang Guan 已提交
776
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
777
  if (syncIsInit()) {
778 779 780 781 782 783
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
784
    pSyncTimer->timeStamp = tsNow;
785 786

    pData->syncNodeRid = pSyncNode->rid;
787 788 789
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
790
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
791

792 793
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
794 795 796 797 798 799
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
800
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
801 802 803 804
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
805 806
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
807 808 809
  return ret;
}

810
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
811 812 813
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831
  SSnapshot snapshot;
  if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
    sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
    return -1;
  }
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer: %" PRId64 ", snapshotVer: %" PRId64,
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
832
// open/close --------------
S
Shengliang Guan 已提交
833 834
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
835 836 837 838
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
839

M
Minghao Li 已提交
840 841 842 843
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
844
      goto _error;
M
Minghao Li 已提交
845
    }
846
  }
M
Minghao Li 已提交
847

S
Shengliang Guan 已提交
848
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
849
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
850
    // create a new raft config file
S
Shengliang Guan 已提交
851
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
852
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
853
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
854
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
855
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
856 857
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
858
      goto _error;
859
    }
860
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
861
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
862 863
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
864 865 866
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
867
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
868
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
869
      goto _error;
870
    }
S
Shengliang Guan 已提交
871 872

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
873 874 875 876 877 878
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
879 880 881 882
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
883 884

    raftCfgClose(pSyncNode->pRaftCfg);
885
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
886 887
  }

M
Minghao Li 已提交
888
  // init by SSyncInfo
M
Minghao Li 已提交
889
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
890 891 892 893 894 895 896
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
897
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
898 899 900
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
901

M
Minghao Li 已提交
902
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
903
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
904 905 906
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
907

B
Benguang Zhao 已提交
908 909 910
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
911
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
912 913 914
    goto _error;
  }

M
Minghao Li 已提交
915 916
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
917
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
918
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
919 920
    goto _error;
  }
M
Minghao Li 已提交
921

M
Minghao Li 已提交
922
  // init internal
M
Minghao Li 已提交
923
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
924
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
925
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
926
    goto _error;
927
  }
M
Minghao Li 已提交
928

M
Minghao Li 已提交
929
  // init peersNum, peers, peersId
M
Minghao Li 已提交
930
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
931 932
  int32_t j = 0;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
933 934
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
935 936 937
      j++;
    }
  }
S
Shengliang Guan 已提交
938
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
939
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
940
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
941
      goto _error;
942
    }
M
Minghao Li 已提交
943
  }
M
Minghao Li 已提交
944

M
Minghao Li 已提交
945
  // init replicaNum, replicasId
M
Minghao Li 已提交
946
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
947
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
948
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
949
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
950
      goto _error;
951
    }
M
Minghao Li 已提交
952 953
  }

M
Minghao Li 已提交
954
  // init raft algorithm
M
Minghao Li 已提交
955
  pSyncNode->pFsm = pSyncInfo->pFsm;
956
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
957
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
958 959
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
960
  // init life cycle outside
M
Minghao Li 已提交
961

M
Minghao Li 已提交
962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
986
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
987
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
988
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
989
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
990
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
991 992
    goto _error;
  }
M
Minghao Li 已提交
993

M
Minghao Li 已提交
994
  // init TLA+ candidate vars
M
Minghao Li 已提交
995
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
996
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
997
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
998 999
    goto _error;
  }
M
Minghao Li 已提交
1000
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
1001
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
1002
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
1003 1004
    goto _error;
  }
M
Minghao Li 已提交
1005

M
Minghao Li 已提交
1006 1007
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
1008
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
1009
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1010 1011
    goto _error;
  }
M
Minghao Li 已提交
1012
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
1013
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
1014
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1015 1016
    goto _error;
  }
M
Minghao Li 已提交
1017 1018 1019

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
1020
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
1021
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
1022 1023
    goto _error;
  }
1024 1025 1026 1027 1028

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1029
    if (code != 0) {
S
Shengliang Guan 已提交
1030
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
1031
      goto _error;
1032
    }
1033 1034
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
1035
      sNTrace(pSyncNode, "reset commit index by snapshot");
1036 1037 1038
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
1039

1040 1041 1042
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
    goto _error;
  }
M
Minghao Li 已提交
1043 1044
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
1045 1046
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
1047

M
Minghao Li 已提交
1048
  // init ping timer
M
Minghao Li 已提交
1049
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
1050
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
1051 1052
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
1053
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
1054
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
1055

M
Minghao Li 已提交
1056 1057
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
1058
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1059
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
1060
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
1061 1062 1063 1064
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
1065
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
1066 1067
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
1068
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
1069 1070
  pSyncNode->heartbeatTimerCounter = 0;

1071 1072 1073 1074 1075
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
1076
  // tools
M
Minghao Li 已提交
1077
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
1078
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
1079
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
1080 1081
    goto _error;
  }
M
Minghao Li 已提交
1082

1083 1084
  // restore state
  pSyncNode->restoreFinish = false;
1085

M
Minghao Li 已提交
1086
  // snapshot senders
S
Shengliang Guan 已提交
1087
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1088
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
1089
    // ASSERT(pSender != NULL);
M
Minghao Li 已提交
1090
    (pSyncNode->senders)[i] = pSender;
S
Shengliang Guan 已提交
1091
    sSTrace(pSender, "snapshot sender create new while open, data:%p", pSender);
M
Minghao Li 已提交
1092 1093 1094
  }

  // snapshot receivers
1095
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
1096

M
Minghao Li 已提交
1097 1098 1099
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
1100 1101 1102
  // replication mgr
  syncNodeLogReplMgrInit(pSyncNode);

M
Minghao Li 已提交
1103 1104 1105
  // peer state
  syncNodePeerStateInit(pSyncNode);

B
Benguang Zhao 已提交
1106
  //
M
Minghao Li 已提交
1107 1108 1109
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1110
  // start in syncNodeStart
M
Minghao Li 已提交
1111
  // start raft
M
Minghao Li 已提交
1112
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1113

M
Minghao Li 已提交
1114 1115
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1116
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1117 1118
  pSyncNode->lastReplicateTime = timeNow;

1119 1120 1121
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
1122 1123
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
1124
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1125
    goto _error;
B
Benguang Zhao 已提交
1126 1127
  }

1128
  pSyncNode->isStart = true;
1129 1130 1131
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1132 1133
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1134
  pSyncNode->tmrRoutineNum = 0;
1135

1136 1137 1138
  sNInfo(pSyncNode, "sync open, node:%p", pSyncNode);
  sTrace("vgId:%d, tsElectInterval:%d, tsHeartbeatInterval:%d, tsHeartbeatTimeout:%d", pSyncNode->vgId, tsElectInterval,
         tsHeartbeatInterval, tsHeartbeatTimeout);
1139

M
Minghao Li 已提交
1140
  return pSyncNode;
1141 1142 1143

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1144 1145
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1146 1147 1148 1149
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1150 1151
}

M
Minghao Li 已提交
1152 1153 1154 1155
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1156
    ASSERT(code == 0);
M
Minghao Li 已提交
1157 1158 1159 1160 1161 1162
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1163
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1164 1165
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1166 1167 1168 1169

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1170 1171 1172 1173 1174 1175
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex: %" PRId64 ", lastVer: %" PRId64 "",
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1176

1177
  ASSERT(endIndex == lastVer + 1);
B
Benguang Zhao 已提交
1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
  commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);

  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) {
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
  if (pSyncNode->replicaNum == 1) {
    raftStoreNextTerm(pSyncNode->pRaftStore);
    syncNodeBecomeLeader(pSyncNode, "one replica start");

    // Raft 3.6.2 Committing entries from previous terms
    syncNodeAppendNoop(pSyncNode);
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1201
  ASSERT(ret == 0);
B
Benguang Zhao 已提交
1202 1203 1204 1205
  return ret;
}

void syncNodeStartOld(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1206
  // start raft
1207
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
1208
    raftStoreNextTerm(pSyncNode->pRaftStore);
1209
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
1210

1211
    // Raft 3.6.2 Committing entries from previous terms
1212 1213
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
1214

M
Minghao Li 已提交
1215 1216
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
1217 1218
  }

1219 1220
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1221
  ASSERT(ret == 0);
M
Minghao Li 已提交
1222 1223
}

B
Benguang Zhao 已提交
1224
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1225 1226 1227 1228 1229 1230 1231
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1232
  ASSERT(ret == 0);
1233

1234 1235
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1236
  ASSERT(ret == 0);
B
Benguang Zhao 已提交
1237
  return ret;
M
Minghao Li 已提交
1238 1239
}

M
Minghao Li 已提交
1240
void syncNodePreClose(SSyncNode* pSyncNode) {
1241 1242 1243 1244
  if (pSyncNode != NULL && pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpApplyQueueItems != NULL) {
    while (1) {
      int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
      sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
1245
      if (aqItems == 0 || aqItems == -1) {
1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260
        break;
      }
      taosMsleep(20);
    }
  }

  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
      snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
    }

    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

M
Minghao Li 已提交
1261 1262 1263 1264 1265 1266 1267
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

1268
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1269

M
Minghao Li 已提交
1270
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1271
  if (pSyncNode == NULL) return;
1272
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1273

S
Shengliang Guan 已提交
1274
  int32_t ret = raftStoreClose(pSyncNode->pRaftStore);
1275
  ASSERT(ret == 0);
M
Minghao Li 已提交
1276
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
1277

B
Benguang Zhao 已提交
1278
  syncNodeLogReplMgrDestroy(pSyncNode);
M
Minghao Li 已提交
1279
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1280
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1281
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1282
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1283
  votesRespondDestory(pSyncNode->pVotesRespond);
1284
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1285
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1286
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1287
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1288
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1289
  logStoreDestory(pSyncNode->pLogStore);
1290
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1291 1292
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1293
  raftCfgClose(pSyncNode->pRaftCfg);
1294
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1295 1296 1297 1298 1299

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1300 1301 1302 1303
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

S
Shengliang Guan 已提交
1304
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1305
    if ((pSyncNode->senders)[i] != NULL) {
S
Shengliang Guan 已提交
1306
      sSTrace((pSyncNode->senders)[i], "snapshot sender destroy while close, data:%p", (pSyncNode->senders)[i]);
1307 1308 1309 1310 1311

      if (snapshotSenderIsStart((pSyncNode->senders)[i])) {
        snapshotSenderStop((pSyncNode->senders)[i], false);
      }

M
Minghao Li 已提交
1312 1313 1314 1315 1316
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1317
  if (pSyncNode->pNewNodeReceiver != NULL) {
1318 1319 1320 1321
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
      snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
    }

M
Minghao Li 已提交
1322 1323 1324 1325
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1326
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1327 1328
}

M
Minghao Li 已提交
1329
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1330

M
Minghao Li 已提交
1331 1332 1333
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1334 1335
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1336 1337 1338
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1339
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1340
  }
M
Minghao Li 已提交
1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1354
  if (syncIsInit()) {
1355
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1356

1357 1358 1359 1360 1361
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1362

M
Minghao Li 已提交
1363
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1364
                 &pSyncNode->pElectTimer);
1365

1366
  } else {
M
Minghao Li 已提交
1367
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1368
  }
M
Minghao Li 已提交
1369 1370 1371 1372 1373
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1374
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1375 1376
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1377

M
Minghao Li 已提交
1378 1379 1380 1381 1382 1383 1384 1385 1386 1387
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1388 1389
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1390 1391 1392 1393 1394 1395 1396
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1397
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1398

S
Shengliang Guan 已提交
1399 1400
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1401 1402 1403
  return ret;
}

M
Minghao Li 已提交
1404
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1405
  int32_t ret = 0;
S
Shengliang Guan 已提交
1406 1407
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1408 1409 1410
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1411
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1412
  }
1413

S
Shengliang Guan 已提交
1414
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1415 1416 1417
  return ret;
}

M
Minghao Li 已提交
1418
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1419
  int32_t ret = 0;
M
Minghao Li 已提交
1420

1421
#if 0
M
Minghao Li 已提交
1422
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1423 1424
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1425

S
Shengliang Guan 已提交
1426
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1427
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1428 1429 1430
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1431
  }
1432

M
Minghao Li 已提交
1433 1434 1435
  return ret;
}

M
Minghao Li 已提交
1436 1437
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1438 1439

#if 0
M
Minghao Li 已提交
1440 1441 1442
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1443
#endif
1444

S
Shengliang Guan 已提交
1445
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1446
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1447 1448 1449
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1450
  }
1451

M
Minghao Li 已提交
1452 1453 1454
  return ret;
}

1455 1456 1457 1458 1459 1460
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1461 1462 1463
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1464
  syncUtilRaftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1465
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1466 1467 1468
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1469
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1470
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1471
  } else {
M
Minghao Li 已提交
1472
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
S
Shengliang Guan 已提交
1473
    rpcFreeCont(pMsg->pCont);
M
Minghao Li 已提交
1474
    return -1;
M
Minghao Li 已提交
1475
  }
M
Minghao Li 已提交
1476

M
Minghao Li 已提交
1477 1478 1479 1480 1481
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1482
  syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1483
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1484 1485 1486
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1487
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1488
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1489
  } else {
M
Minghao Li 已提交
1490
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1491
  }
M
Minghao Li 已提交
1492 1493 1494
  return 0;
}

1495
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1496 1497 1498
  bool b1 = false;
  bool b2 = false;

S
Shengliang Guan 已提交
1499
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1500 1501 1502 1503 1504 1505 1506
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

S
Shengliang Guan 已提交
1507
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1508 1509 1510 1511 1512 1513 1514 1515 1516 1517
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

1518
  ASSERT(b1 == b2);
1519 1520 1521
  return b1;
}

1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1535
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1536
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1537 1538 1539 1540
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1541

1542
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1543 1544
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

1545 1546
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1547 1548
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1549

M
Minghao Li 已提交
1550 1551
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1552

M
Minghao Li 已提交
1553 1554 1555 1556
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1557
  }
1558

M
Minghao Li 已提交
1559 1560 1561 1562 1563
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1564

M
Minghao Li 已提交
1565
  // log begin config change
S
Shengliang Guan 已提交
1566 1567 1568 1569
  char oldCfgStr[1024] = {0};
  char newCfgStr[1024] = {0};
  syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
  syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
1570
  sNInfo(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
M
Minghao Li 已提交
1571

M
Minghao Li 已提交
1572 1573
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1574
  }
M
Minghao Li 已提交
1575 1576
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1577 1578
  }

M
Minghao Li 已提交
1579
  // add last config index
M
Minghao Li 已提交
1580
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1581

M
Minghao Li 已提交
1582 1583 1584 1585 1586 1587 1588 1589 1590
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1591
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1592
      oldSenders[i] = (pSyncNode->senders)[i];
S
Shengliang Guan 已提交
1593
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1594
    }
1595

M
Minghao Li 已提交
1596 1597
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1598
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1599 1600 1601

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1602 1603
    int32_t j = 0;
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
1604 1605 1606 1607 1608
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
S
Shengliang Guan 已提交
1609
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1610
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1611
    }
1612

M
Minghao Li 已提交
1613 1614
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
1615
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
1616
      syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1617
    }
1618

1619 1620 1621
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1622 1623 1624 1625
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1626

M
Minghao Li 已提交
1627
    // reset snapshot senders
1628

M
Minghao Li 已提交
1629
    // clear new
S
Shengliang Guan 已提交
1630
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1631 1632
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
1633

M
Minghao Li 已提交
1634
    // reset new
S
Shengliang Guan 已提交
1635
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1636 1637
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1638
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1639
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
M
Minghao Li 已提交
1640 1641 1642
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1643
          sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
S
Shengliang Guan 已提交
1644
                  (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
M
Minghao Li 已提交
1645 1646 1647 1648 1649 1650 1651 1652 1653

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

S
Shengliang Guan 已提交
1654 1655
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
                  i, host, port, (pSyncNode->senders)[i], reset);
M
Minghao Li 已提交
1656 1657

          break;
M
Minghao Li 已提交
1658
        }
1659 1660
      }
    }
1661

M
Minghao Li 已提交
1662
    // create new
S
Shengliang Guan 已提交
1663
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1664 1665
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
S
Shengliang Guan 已提交
1666 1667 1668
        sSTrace((pSyncNode->senders)[i], "snapshot sender create new while reconfig, data:%p", (pSyncNode->senders)[i]);
      } else {
        sSTrace((pSyncNode->senders)[i], "snapshot sender already exist, data:%p", (pSyncNode->senders)[i]);
M
Minghao Li 已提交
1669
      }
1670 1671
    }

M
Minghao Li 已提交
1672
    // free old
S
Shengliang Guan 已提交
1673
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1674
      if (oldSenders[i] != NULL) {
S
Shengliang Guan 已提交
1675
        sNTrace(pSyncNode, "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1676 1677 1678
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1679 1680
    }

1681
    // persist cfg
M
Minghao Li 已提交
1682
    raftCfgPersist(pSyncNode->pRaftCfg);
1683

S
Shengliang Guan 已提交
1684
    char tmpbuf[1024] = {0};
1685
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
S
Shengliang Guan 已提交
1686
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1687

M
Minghao Li 已提交
1688 1689 1690
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
1691 1692 1693

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1694
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1695

M
Minghao Li 已提交
1696 1697 1698 1699
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
1700
    // persist cfg
M
Minghao Li 已提交
1701
    raftCfgPersist(pSyncNode->pRaftCfg);
1702 1703
    sNInfo(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s", oldConfig.replicaNum,
           pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
1704
  }
1705

M
Minghao Li 已提交
1706
_END:
M
Minghao Li 已提交
1707
  // log end config change
1708
  sNInfo(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1709 1710
}

M
Minghao Li 已提交
1711 1712 1713 1714
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1715
    char tmpBuf[64];
1716
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1717
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1718 1719 1720 1721
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1722 1723 1724 1725 1726 1727
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1728
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1729
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1730
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1731
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1732 1733
    return;
  }
M
Minghao Li 已提交
1734 1735

  do {
1736
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1737
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1738 1739 1740 1741 1742
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1743
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1744 1745 1746 1747 1748 1749 1750 1751 1752 1753
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1754 1755
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1756
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1757
  // maybe clear leader cache
M
Minghao Li 已提交
1758 1759 1760 1761
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1762 1763
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1764
  // state change
M
Minghao Li 已提交
1765 1766 1767
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1768 1769
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1770

1771 1772 1773
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1774 1775 1776 1777 1778
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1779 1780 1781
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1782 1783 1784
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1785
  // trace log
S
Shengliang Guan 已提交
1786
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1807
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1808 1809
  pSyncNode->leaderTime = taosGetTimestampMs();

1810
  pSyncNode->becomeLeaderNum++;
1811
  pSyncNode->hbrSlowNum = 0;
1812

1813 1814 1815
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1816
  // state change
M
Minghao Li 已提交
1817
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1818 1819

  // set leader cache
M
Minghao Li 已提交
1820 1821
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1822
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1823 1824
    // maybe overwrite myself, no harm
    // just do it!
1825 1826 1827 1828 1829 1830 1831

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1832
    ASSERT(code == 0);
1833
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1834 1835
  }

S
Shengliang Guan 已提交
1836
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1837 1838
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1839 1840 1841
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1842 1843 1844
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1845
#if 0
1846 1847
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1848
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1849
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1850 1851 1852
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1853
    }
1854
    (pMySender->privateTerm) += 100;
1855
  }
M
Minghao Li 已提交
1856
#endif
1857

1858
  // close receiver
M
Minghao Li 已提交
1859 1860
  if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
      snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
1861 1862 1863
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1864
  // stop elect timer
M
Minghao Li 已提交
1865
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1866

M
Minghao Li 已提交
1867 1868
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1869

M
Minghao Li 已提交
1870 1871
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1872

1873 1874 1875 1876 1877
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1878 1879 1880
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1881 1882 1883
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1884
  // trace log
1885
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1886 1887 1888
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1889 1890
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1891
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1892

S
Shengliang Guan 已提交
1893
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1894

B
Benguang Zhao 已提交
1895
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1896 1897 1898 1899
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1900
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1901
  ASSERT(lastIndex >= 0);
B
Benguang Zhao 已提交
1902 1903
  sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "",
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1904 1905 1906
}

void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) {
1907 1908
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
B
Benguang Zhao 已提交
1909 1910
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");

M
Minghao Li 已提交
1911
  // Raft 3.6.2 Committing entries from previous terms
1912 1913
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1914 1915

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1916
    syncNodeReplicate(pSyncNode);
1917
  }
M
Minghao Li 已提交
1918 1919
}

M
Minghao Li 已提交
1920 1921
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1922
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1923
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1924 1925 1926 1927 1928
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1929 1930 1931
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1932
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1933
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1934 1935 1936
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1937

S
Shengliang Guan 已提交
1938
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1939 1940 1941
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1942
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1943
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1944 1945 1946 1947
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);

S
Shengliang Guan 已提交
1948
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1949 1950 1951
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1952
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1953
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1954 1955 1956 1957
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);

S
Shengliang Guan 已提交
1958
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1959 1960
}

M
Minghao Li 已提交
1961 1962
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1963
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1964 1965
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1966 1967 1968 1969

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1970
// simulate get vote from outside
M
Minghao Li 已提交
1971
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1972
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1973

S
Shengliang Guan 已提交
1974 1975
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1976
  if (ret != 0) return;
M
Minghao Li 已提交
1977

S
Shengliang Guan 已提交
1978
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1979 1980 1981 1982 1983 1984 1985
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1986
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1987 1988
}

M
Minghao Li 已提交
1989
// return if has a snapshot
M
Minghao Li 已提交
1990 1991
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1992
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1993 1994
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1995 1996 1997 1998 1999 2000 2001
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
2002 2003
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
2004
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2005
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2006 2007
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2008 2009 2010 2011 2012 2013 2014
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
2015 2016
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
2017 2018
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
2019 2020
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
2021
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2022 2023
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2024 2025
    }

M
Minghao Li 已提交
2026 2027 2028
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
2029 2030 2031 2032
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
2033
  } else {
M
Minghao Li 已提交
2034 2035
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
2036
  }
M
Minghao Li 已提交
2037

M
Minghao Li 已提交
2038 2039 2040 2041 2042 2043 2044
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
2045 2046
  return 0;
}
M
Minghao Li 已提交
2047

M
Minghao Li 已提交
2048
// return append-entries first try index
M
Minghao Li 已提交
2049 2050 2051 2052 2053
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
2054 2055
// if index > 0, return index - 1
// else, return -1
2056 2057 2058 2059 2060 2061 2062 2063 2064
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
2065 2066 2067 2068
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
2069 2070 2071 2072 2073 2074 2075 2076 2077
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

2078 2079 2080
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

2081
  SSyncRaftEntry* pPreEntry = NULL;
2082 2083 2084 2085 2086 2087 2088
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

2089
    pSyncNode->pLogStore->cacheHit++;
2090 2091 2092
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
2093
    pSyncNode->pLogStore->cacheMiss++;
2094 2095 2096 2097
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
2098 2099 2100 2101 2102 2103

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2104
  if (code == 0) {
2105
    ASSERT(pPreEntry != NULL);
2106
    preTerm = pPreEntry->term;
2107 2108 2109 2110

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
2111
      syncEntryDestroy(pPreEntry);
2112 2113
    }

2114 2115
    return preTerm;
  } else {
2116 2117 2118 2119
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2120 2121 2122 2123
      }
    }
  }

2124
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
2125
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2126 2127
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2128 2129 2130 2131

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2132
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2133 2134 2135
  return 0;
}

M
Minghao Li 已提交
2136
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2137
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2138

S
Shengliang Guan 已提交
2139 2140 2141
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2142
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
2143 2144
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
2145
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
2146 2147
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
2148
    }
M
Minghao Li 已提交
2149

M
Minghao Li 已提交
2150
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
2151 2152
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
2153
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
2154 2155
      rpcFreeCont(rpcMsg.pCont);
      return;
2156
    }
M
Minghao Li 已提交
2157

S
Shengliang Guan 已提交
2158
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
2159
  }
M
Minghao Li 已提交
2160 2161
}

M
Minghao Li 已提交
2162
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2163
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2164

M
Minghao Li 已提交
2165 2166
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2167

2168
  if (pNode == NULL) return;
M
Minghao Li 已提交
2169 2170 2171 2172 2173

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2174

2175
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2176 2177 2178 2179
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2180

S
Shengliang Guan 已提交
2181
  SRpcMsg rpcMsg = {0};
2182 2183
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2184

S
Shengliang Guan 已提交
2185
  if (code != 0) {
M
Minghao Li 已提交
2186
    sError("failed to build elect msg");
M
Minghao Li 已提交
2187
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2188
    return;
M
Minghao Li 已提交
2189 2190
  }

S
Shengliang Guan 已提交
2191
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2192
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2193 2194 2195

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2196
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2197
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2198
    syncNodeRelease(pNode);
2199
    return;
M
Minghao Li 已提交
2200
  }
M
Minghao Li 已提交
2201 2202

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2203 2204
}

M
Minghao Li 已提交
2205
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2206
  if (!syncIsInit()) return;
2207

S
Shengliang Guan 已提交
2208 2209 2210 2211
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2212
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2213 2214 2215
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2216
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2217
        return;
2218
      }
M
Minghao Li 已提交
2219

2220
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2221 2222
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2223
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2224 2225
        rpcFreeCont(rpcMsg.pCont);
        return;
2226
      }
S
Shengliang Guan 已提交
2227 2228 2229 2230

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2231
    } else {
S
Shengliang Guan 已提交
2232 2233
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2234
    }
M
Minghao Li 已提交
2235 2236 2237
  }
}

2238
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2239
  int64_t hbDataRid = (int64_t)param;
2240
  int64_t tsNow = taosGetTimestampMs();
2241

2242 2243
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
M
Minghao Li 已提交
2244
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2245 2246
    return;
  }
2247

2248
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2249
  if (pSyncNode == NULL) {
2250
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2251
    sError("hb timer get pSyncNode NULL");
2252 2253 2254 2255 2256 2257 2258 2259
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2260
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2261 2262 2263
    return;
  }

M
Minghao Li 已提交
2264
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2265 2266
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2267
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2268 2269 2270
    return;
  }

M
Minghao Li 已提交
2271
  if (pSyncNode->pRaftStore == NULL) {
2272 2273
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2274
    sError("vgId:%d, hb timer raft store already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2275 2276 2277
    return;
  }

M
Minghao Li 已提交
2278
  // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
2279 2280

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2281 2282 2283
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2284
    if (timerLogicClock == msgLogicClock) {
2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304
      if (tsNow > pData->execTime) {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
            "---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif

        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
        pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
        pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
        pSyncMsg->privateTerm = 0;
2305
        pSyncMsg->timeStamp = tsNow;
2306 2307 2308 2309 2310 2311

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
2312 2313
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2314 2315 2316 2317 2318 2319 2320 2321
      } else {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
      }

M
Minghao Li 已提交
2322 2323
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2324 2325
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2326 2327 2328 2329
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2330
    } else {
M
Minghao Li 已提交
2331 2332
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2333 2334
    }
  }
2335 2336 2337

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2338 2339
}

2340 2341 2342 2343 2344
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
2345

2346 2347 2348 2349
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
2350

S
Shengliang Guan 已提交
2351
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2352
  int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
2353
  syncEntryDestroy(pEntry);
M
Minghao Li 已提交
2354

2355 2356 2357
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2358
    sError("failed to propose noop msg while enqueue since %s", terrstr());
2359
  }
M
Minghao Li 已提交
2360

2361
  return code;
M
Minghao Li 已提交
2362 2363
}

2364 2365
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2366 2367 2368 2369
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2370 2371
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2372 2373 2374 2375 2376 2377 2378 2379 2380
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2381 2382 2383
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2384
    sError("vgId:%d, failed to enqueue sync log buffer. index:%" PRId64 "", ths->vgId, pEntry->index);
B
Benguang Zhao 已提交
2385 2386 2387 2388
    return -1;
  }

  // proceed match index, with replicating on needed
2389
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2390

2391
  sTrace("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2392 2393 2394
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2395

B
Benguang Zhao 已提交
2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2412
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424
  if (pSyncNode->replicaNum == 1) {
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2425
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2426 2427 2428 2429 2430 2431 2432 2433 2434
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2454
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2455 2456 2457 2458 2459 2460 2461 2462 2463
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
  SyncTerm  term = ths->pRaftStore->currentTerm;

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2464 2465
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2466 2467 2468
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2469 2470
  int32_t ret = 0;

2471
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2472
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2473
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2474
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2475

2476 2477
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2478
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2479
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2480
    if (code != 0) {
M
Minghao Li 已提交
2481
      sError("append noop error");
2482 2483
      return -1;
    }
2484 2485

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2486 2487
  }

2488 2489 2490
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2491
    syncEntryDestroy(pEntry);
2492 2493
  }

M
Minghao Li 已提交
2494 2495 2496
  return ret;
}

S
Shengliang Guan 已提交
2497 2498
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2499

M
Minghao Li 已提交
2500 2501 2502 2503
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2504
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2505
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2506
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2507

2508 2509 2510 2511
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2512 2513 2514 2515
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number
2516
  pMsgReply->startTime = ths->startTime;
2517
  pMsgReply->timeStamp = tsMs;
2518

M
Minghao Li 已提交
2519
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2520 2521
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);

2522
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2523
    ths->minMatchIndex = pMsg->minMatchIndex;
2524 2525

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2526
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
S
Shengliang Guan 已提交
2527 2528 2529 2530
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2531 2532
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;
2533
      SyncIndex fcIndex = pSyncMsg->fcIndex;
2534 2535 2536 2537 2538 2539 2540

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2541
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2542 2543
        }
      }
2544 2545 2546
    }
  }

M
Minghao Li 已提交
2547
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2548
    // syncNodeStepDown(ths, pMsg->term);
S
Shengliang Guan 已提交
2549 2550 2551 2552
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2553 2554 2555
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

S
Shengliang Guan 已提交
2556 2557
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2558 2559 2560 2561
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2562
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2563
      }
2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578
    }
  }

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
  return 0;
}

2579
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2580 2581 2582 2583
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2584
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2585
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2586 2587 2588 2589
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2590 2591

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2592
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2593

2594 2595
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2596 2597 2598
  return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg);
}

2599
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2600
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2601

M
Minghao Li 已提交
2602 2603 2604 2605
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2606
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2607
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2608
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2609

2610
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2611
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2612 2613 2614
  return 0;
}

S
Shengliang Guan 已提交
2615 2616
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2617 2618
  syncLogRecvLocalCmd(ths, pMsg, "");

2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    (void)syncNodeUpdateCommitIndex(ths, pMsg->fcIndex);
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
      sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(),
             ths->commitIndex);
    }
  } else {
    sError("error local cmd");
  }

  return 0;
}

int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2639 2640 2641
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2642 2643 2644
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2645
  } else {
M
Minghao Li 已提交
2646
    sError("error local cmd");
M
Minghao Li 已提交
2647
  }
2648 2649 2650 2651

  return 0;
}

M
Minghao Li 已提交
2652 2653 2654 2655 2656 2657 2658 2659 2660 2661
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2662

2663
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2664
  sNTrace(ths, "on client request");
2665

B
Benguang Zhao 已提交
2666 2667
  int32_t code = 0;

B
Benguang Zhao 已提交
2668 2669 2670
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = NULL;
2671 2672 2673 2674
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2675 2676 2677 2678 2679 2680 2681
  }

  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2682 2683
    int32_t code = syncNodeAppend(ths, pEntry);
    if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) {
S
Shengliang Guan 已提交
2684
      ASSERTS(false, "failed to append blocking msg");
2685 2686
    }
    return code;
B
Benguang Zhao 已提交
2687 2688
  }

B
Benguang Zhao 已提交
2689
  return -1;
B
Benguang Zhao 已提交
2690 2691
}

2692 2693
int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
  sNTrace(ths, "on client request");
B
Benguang Zhao 已提交
2694

M
Minghao Li 已提交
2695
  int32_t ret = 0;
2696
  int32_t code = 0;
M
Minghao Li 已提交
2697

M
Minghao Li 已提交
2698
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2699
  SyncTerm        term = ths->pRaftStore->currentTerm;
2700 2701 2702 2703 2704 2705 2706
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2707

2708 2709
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2710
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2711 2712 2713
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2714 2715 2716 2717
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
2718
          syncEntryDestroy(pEntry);
2719
        }
2720

2721 2722 2723 2724
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2736
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2737 2738 2739 2740

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
2741
          syncEntryDestroy(pEntry);
2742 2743
        }

2744 2745
        return -1;
      }
2746
    }
M
Minghao Li 已提交
2747

2748 2749
    syncCacheEntry(ths->pLogStore, pEntry, &h);

2750 2751
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2752
      syncNodeReplicate(ths);
2753
    }
2754

2755 2756
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2757 2758 2759 2760 2761
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2762
    }
M
Minghao Li 已提交
2763 2764
  }

2765 2766 2767 2768 2769 2770 2771 2772
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2773 2774 2775
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2776
    syncEntryDestroy(pEntry);
2777 2778
  }

M
Minghao Li 已提交
2779
  return ret;
2780
}
M
Minghao Li 已提交
2781

S
Shengliang Guan 已提交
2782 2783 2784
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2785
      return "follower";
S
Shengliang Guan 已提交
2786
    case TAOS_SYNC_STATE_CANDIDATE:
2787
      return "candidate";
S
Shengliang Guan 已提交
2788
    case TAOS_SYNC_STATE_LEADER:
2789
      return "leader";
S
Shengliang Guan 已提交
2790
    case TAOS_SYNC_STATE_ERROR:
2791
      return "error";
S
Shengliang Guan 已提交
2792 2793 2794 2795
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
    default:
      return "unknown";
S
Shengliang Guan 已提交
2796
  }
M
Minghao Li 已提交
2797
}
2798

2799
#if 0
2800
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2801
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2802
    sNTrace(ths, "I am not follower, can not do leader transfer");
2803 2804
    return 0;
  }
2805 2806

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2807
    sNTrace(ths, "restore not finish, can not do leader transfer");
2808 2809 2810
    return 0;
  }

2811
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2812
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2813 2814 2815 2816
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2817
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2818 2819 2820
    return 0;
  }

2821 2822
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2823
      sNTrace(ths, "I am vnode, can not do leader transfer");
2824 2825 2826 2827
      return 0;
    }
  */

2828
  SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
S
Shengliang Guan 已提交
2829
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2830

M
Minghao Li 已提交
2831 2832 2833
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2834

M
Minghao Li 已提交
2835 2836
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2837 2838 2839
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
2840
    ASSERT(ret == 0);
M
Minghao Li 已提交
2841

2842
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2843
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2844 2845
  }

M
Minghao Li 已提交
2846
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2847
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2848 2849 2850 2851 2852 2853 2854 2855 2856
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2857 2858
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2859 2860
  }

2861 2862 2863
  return 0;
}

2864 2865
#endif

2866
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2867
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2881 2882 2883 2884
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2885
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2886
  ASSERT(false);
2887 2888 2889 2890
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2891 2892 2893 2894 2895 2896 2897 2898 2899
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2900
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2901

M
Minghao Li 已提交
2902 2903 2904
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2905 2906
  }

2907 2908
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2909

S
Shengliang Guan 已提交
2910
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2911 2912 2913 2914 2915 2916

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2917 2918 2919 2920
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2921

2922
          ths->pLogStore->cacheHit++;
2923 2924
          sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry);

2925
        } else {
2926
          ths->pLogStore->cacheMiss++;
2927 2928
          sNTrace(ths, "miss cache index:%" PRId64, i);

2929
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
2930 2931
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2932
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2933
            sNError(ths, "get log entry error");
2934
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2935 2936
            continue;
          }
2937
        }
2938

2939
        SRpcMsg rpcMsg = {0};
2940 2941
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2942
        sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType));
M
Minghao Li 已提交
2943

2944
        // user commit
2945 2946
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2947
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2948 2949 2950
            internalExecute = false;
          }

M
Minghao Li 已提交
2951 2952
          sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute,
                  TMSG_INFO(pEntry->msgType));
2953

2954 2955
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2968
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2969
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
M
Minghao Li 已提交
2970
          }
2971
        }
2972

2973 2974
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2975
        // leader transfer
2976 2977
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
2978
          ASSERT(code == 0);
2979
        }
2980
#endif
2981 2982

        // restore finish
2983
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
2984 2985 2986 2987 2988 2989
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
2990

2991
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
2992
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
2993 2994 2995 2996
          }
        }

        rpcFreeCont(rpcMsg.pCont);
2997 2998 2999
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
B
Benguang Zhao 已提交
3000
          syncEntryDestroy(pEntry);
3001
        }
3002 3003 3004 3005
      }
    }
  }
  return 0;
3006 3007 3008
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
3009
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
3010 3011 3012 3013 3014
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
3015 3016 3017 3018
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
3019
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
3020 3021 3022 3023 3024
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
3025
}
M
Minghao Li 已提交
3026

3027 3028
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
3029
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
3030 3031 3032 3033 3034 3035 3036
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
3037 3038
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
3039
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
3040 3041 3042 3043 3044 3045 3046 3047 3048
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
3049
  if (pState == NULL) {
3050
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
3051 3052
    return false;
  }
M
Minghao Li 已提交
3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
3078
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
3079
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
3080
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
3081 3082 3083 3084 3085 3086
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
3087
}