syncMain.c 98.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
M
Minghao Li 已提交
40

M
Minghao Li 已提交
41 42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
48 49 50
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58 59 60 61
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
64
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
65
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
66
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
67 68
    return -1;
  }
M
Minghao Li 已提交
69

S
Shengliang Guan 已提交
70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
71
  if (pSyncNode->rid < 0) {
72
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
73 74 75
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79 80 81
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
82
  return pSyncNode->rid;
M
Minghao Li 已提交
83
}
M
Minghao Li 已提交
84

B
Benguang Zhao 已提交
85
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
86
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
87
  if (pSyncNode == NULL) {
B
Benguang Zhao 已提交
88 89 90 91 92
    sError("failed to acquire rid: %" PRId64 " of tsNodeReftId for pSyncNode", rid);
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
93
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
94
    goto _err;
M
Minghao Li 已提交
95
  }
M
Minghao Li 已提交
96

B
Benguang Zhao 已提交
97 98 99 100
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
101

B
Benguang Zhao 已提交
102 103
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
104

105 106 107
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
108 109
}

M
Minghao Li 已提交
110
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
111
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
112
  if (pSyncNode != NULL) {
113
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
114
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
115
    syncNodeRemove(rid);
M
Minghao Li 已提交
116 117 118
  }
}

M
Minghao Li 已提交
119 120
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
121 122 123
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
124 125 126
  }
}

S
Shengliang Guan 已提交
127 128 129
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
130 131
}

S
Shengliang Guan 已提交
132
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
133
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
134
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
135

M
Minghao Li 已提交
136
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
137
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
138
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
139
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
140
    return -1;
M
Minghao Li 已提交
141
  }
142

S
Shengliang Guan 已提交
143 144
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
145

M
Minghao Li 已提交
146 147 148 149
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
150
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
151 152 153
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
154
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
155
  }
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
158
  return 0;
M
Minghao Li 已提交
159
}
M
Minghao Li 已提交
160

S
Shengliang Guan 已提交
161 162 163 164
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
165
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
166 167
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
197
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
198 199 200 201 202 203 204 205
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
    default:
      sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg,
             TMSG_INFO(pMsg->msgType));
      code = -1;
M
Minghao Li 已提交
206 207
  }

S
Shengliang Guan 已提交
208
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
209
  return code;
210 211
}

S
Shengliang Guan 已提交
212
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
213
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
214
  if (pSyncNode == NULL) return -1;
215

S
Shengliang Guan 已提交
216
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
217
  syncNodeRelease(pSyncNode);
218 219 220
  return ret;
}

221
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
222
  SSyncNode* pNode = syncNodeAcquire(rid);
223
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
224 225

  SRpcMsg rpcMsg = {0};
226
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
227 228 229
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
230
  if (ret == 1) {
231
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle,
232 233
          rpcMsg.info.ahandle);
    rpcSendResponse(&rpcMsg);
234 235
    return 0;
  } else {
236
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
237
    return -1;
238
  }
S
Shengliang Guan 已提交
239 240
}

M
Minghao Li 已提交
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

257
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
258
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
259
  if (pSyncNode == NULL) {
260
    sError("sync begin snapshot error");
261 262
    return -1;
  }
263

264 265
  int32_t code = 0;

M
Minghao Li 已提交
266
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
267 268 269
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
270 271 272
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
273 274 275
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
276 277
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
278
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
279 280 281
      return 0;
    }

M
Minghao Li 已提交
282 283 284
    goto _DEL_WAL;

  } else {
285 286 287 288 289 290 291 292 293 294 295 296
    lastApplyIndex -= SYNC_VNODE_LOG_RETENTION;

    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
      syncNodeRelease(pSyncNode);
      return 0;
    }

M
Minghao Li 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309 310
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
311 312 313 314
              sNTrace(pSyncNode,
                      "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                      " of %s:%d, do not delete wal",
                      lastApplyIndex, matchIndex, host, port);
M
Minghao Li 已提交
315 316
            } while (0);

S
Shengliang Guan 已提交
317
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
318 319 320 321 322 323
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
324 325 326
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
327
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
328 329 330 331
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
332
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
333
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
334 335 336
        return 0;

      } else {
S
Shengliang Guan 已提交
337
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
338
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
339 340 341 342 343 344 345 346 347
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
348 349 350
    }
  }

M
Minghao Li 已提交
351
_DEL_WAL:
352

M
Minghao Li 已提交
353
  do {
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

        code = walBeginSnapshot(pData->pWal, lastApplyIndex);
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
374

M
Minghao Li 已提交
375
      } else {
376 377
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
378
      }
379
    }
M
Minghao Li 已提交
380
  } while (0);
381

S
Shengliang Guan 已提交
382
  syncNodeRelease(pSyncNode);
383 384 385 386
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
387
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
388
  if (pSyncNode == NULL) {
389
    sError("sync end snapshot error");
390 391 392
    return -1;
  }

393 394 395 396
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
397
    if (code != 0) {
398
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
399
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
400 401
      return -1;
    } else {
S
Shengliang Guan 已提交
402
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
403 404
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
405
  }
406

S
Shengliang Guan 已提交
407
  syncNodeRelease(pSyncNode);
408 409 410
  return code;
}

M
Minghao Li 已提交
411
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
412
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
413
  if (pSyncNode == NULL) {
414
    sError("sync step down error");
M
Minghao Li 已提交
415 416 417
    return -1;
  }

M
Minghao Li 已提交
418
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
419
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
420
  return 0;
M
Minghao Li 已提交
421 422
}

423
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
424
  if (pSyncNode == NULL) {
425
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
426
    sError("sync ready for read error");
427 428
    return false;
  }
M
Minghao Li 已提交
429

430 431 432 433 434 435
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

  if (pSyncNode->restoreFinish) {
436
    return true;
M
Minghao Li 已提交
437 438
  }

439
  bool ready = false;
440 441 442
  if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
    // apply queue not empty
    ready = false;
M
Minghao Li 已提交
443

444 445 446 447 448 449 450 451 452 453 454 455 456
  } else {
    if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
      SyncIndex       lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
      SSyncRaftEntry* pEntry = NULL;
      SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
      LRUHandle*      h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
      int32_t         code = 0;
      if (h) {
        pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        code = 0;

        pSyncNode->pLogStore->cacheHit++;
        sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
M
Minghao Li 已提交
457

458 459 460
      } else {
        pSyncNode->pLogStore->cacheMiss++;
        sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
M
Minghao Li 已提交
461

462 463
        code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
      }
464

465 466 467
      if (code == 0 && pEntry != NULL) {
        if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
          ready = true;
468
        }
469

470 471 472 473
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestroy(pEntry);
474
        }
475 476 477 478
      }
    }
  }

479
  if (!ready) {
480
    terrno = TSDB_CODE_SYN_RESTORING;
481
  }
482

483 484 485 486 487 488 489 490 491 492 493 494
  return ready;
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

495 496
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
497
}
M
Minghao Li 已提交
498

499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
521 522
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
523
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
524 525
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
526
  }
M
Minghao Li 已提交
527

528
  int32_t ret = 0;
529
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
530
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
531 532 533 534 535 536 537
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
538 539 540
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
541
  return ret;
M
Minghao Li 已提交
542 543
}

M
Minghao Li 已提交
544 545
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
546
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
547 548
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
549
  }
550

S
Shengliang Guan 已提交
551
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
552

553 554 555 556
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
557 558 559 560
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
561
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
562 563
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
564 565
}

566 567
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
568

S
Shengliang Guan 已提交
569
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
570 571 572
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
573 574 575 576 577
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
578
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
579 580
  }

581
  return state;
M
Minghao Li 已提交
582 583
}

584
#if 0
585 586 587 588 589
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
590
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
591 592 593
  if (pSyncNode == NULL) {
    return -1;
  }
594
  ASSERT(rid == pSyncNode->rid);
595 596 597 598 599

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
B
Benguang Zhao 已提交
600
      syncEntryDestroy(pEntry);
601
    }
S
Shengliang Guan 已提交
602
    syncNodeRelease(pSyncNode);
603 604
    return -1;
  }
605
  ASSERT(pEntry != NULL);
606 607 608 609 610 611

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

612
  syncEntryDestroy(pEntry);
S
Shengliang Guan 已提交
613
  syncNodeRelease(pSyncNode);
614 615 616
  return 0;
}

617
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
618
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
619 620 621
  if (pSyncNode == NULL) {
    return -1;
  }
622
  ASSERT(rid == pSyncNode->rid);
623 624
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
625
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
626

S
Shengliang Guan 已提交
627
  syncNodeRelease(pSyncNode);
628 629 630
  return 0;
}

631
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
632
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
633 634 635
  if (pSyncNode == NULL) {
    return -1;
  }
636
  ASSERT(rid == pSyncNode->rid);
637

638
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
639 640
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
641
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
642 643 644 645 646 647
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
648
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
649
         sMeta->lastConfigIndex);
650

S
Shengliang Guan 已提交
651
  syncNodeRelease(pSyncNode);
652 653
  return 0;
}
654
#endif
655

656
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
657
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
658 659
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
660
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
661 662 663 664 665
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
666
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
667
         snapshotLastApplyIndex, lastIndex);
668 669 670 671

  return lastIndex;
}

672 673
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
674

S
Shengliang Guan 已提交
675
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
676
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
677

S
Shengliang Guan 已提交
678
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
679 680 681 682
    SEp* pEp = &pEpSet->eps[i];
    tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    pEpSet->numOfEps++;
683
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
684
  }
M
Minghao Li 已提交
685 686
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
687 688
  }

S
Shengliang Guan 已提交
689
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
690
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
691 692
}

S
Shengliang Guan 已提交
693
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
694
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
695
  if (pSyncNode == NULL) {
696
    sError("sync propose error");
M
Minghao Li 已提交
697
    return -1;
698
  }
699

S
Shengliang Guan 已提交
700
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
701
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
702 703
  return ret;
}
M
Minghao Li 已提交
704

S
Shengliang Guan 已提交
705
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
706 707
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
708
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
709 710
    return -1;
  }
711

S
Shengliang Guan 已提交
712 713 714 715 716 717 718
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
719

720
  // heartbeat timeout
721
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
722 723 724 725 726 727
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
728 729 730
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
731
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
732 733 734
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
735 736 737
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
738
    } else {
S
Shengliang Guan 已提交
739
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
740
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
741
             TMSG_INFO(pMsg->msgType));
742
      return -1;
743
    }
S
Shengliang Guan 已提交
744
  } else {
S
Shengliang Guan 已提交
745 746
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
747
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
748
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
749 750 751 752
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
753
    }
754

755 756 757 758 759
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
760
    }
M
Minghao Li 已提交
761

S
Shengliang Guan 已提交
762
    if (seq != NULL) *seq = seqNum;
763
    return code;
M
Minghao Li 已提交
764
  }
M
Minghao Li 已提交
765 766
}

S
Shengliang Guan 已提交
767
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
768 769 770 771 772
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
773
  pSyncTimer->timeStamp = taosGetTimestampMs();
774 775 776 777
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
778
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
779
  int32_t ret = 0;
S
Shengliang Guan 已提交
780
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
781
  if (syncIsInit()) {
782 783 784 785 786 787
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
788
    pSyncTimer->timeStamp = tsNow;
789 790

    pData->syncNodeRid = pSyncNode->rid;
791 792 793
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
794
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
795

796 797
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
798 799 800 801 802 803
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
804
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
805 806 807 808
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
809 810
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
811 812 813
  return ret;
}

814
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
815 816 817
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
818 819 820
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

821 822 823 824 825 826 827 828 829 830 831 832 833
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer: %" PRId64 ", snapshotVer: %" PRId64,
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
834
// open/close --------------
S
Shengliang Guan 已提交
835 836
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
837 838 839 840
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
841

M
Minghao Li 已提交
842 843 844 845
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
846
      goto _error;
M
Minghao Li 已提交
847
    }
848
  }
M
Minghao Li 已提交
849

S
Shengliang Guan 已提交
850
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
851
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
852
    // create a new raft config file
S
Shengliang Guan 已提交
853
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
854
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
855
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
856
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
857
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
858 859
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
860
      goto _error;
861
    }
862
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
863
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
864 865
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
866 867 868
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
869
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
870
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
871
      goto _error;
872
    }
S
Shengliang Guan 已提交
873 874

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
875 876 877 878 879 880
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
881 882 883 884
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
885 886

    raftCfgClose(pSyncNode->pRaftCfg);
887
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
888 889
  }

M
Minghao Li 已提交
890
  // init by SSyncInfo
M
Minghao Li 已提交
891
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
892
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
S
Shengliang Guan 已提交
893
  sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
S
Shengliang Guan 已提交
894 895
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
S
Shengliang Guan 已提交
896
    sInfo("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
S
Shengliang Guan 已提交
897 898
  }

M
Minghao Li 已提交
899
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
900 901 902
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
903

M
Minghao Li 已提交
904
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
905
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
906 907 908
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
909

B
Benguang Zhao 已提交
910 911 912
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
913
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
914 915 916
    goto _error;
  }

M
Minghao Li 已提交
917 918
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
919
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
920
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
921 922
    goto _error;
  }
M
Minghao Li 已提交
923

M
Minghao Li 已提交
924
  // init internal
M
Minghao Li 已提交
925
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
926
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
927
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
928
    goto _error;
929
  }
M
Minghao Li 已提交
930

M
Minghao Li 已提交
931
  // init peersNum, peers, peersId
M
Minghao Li 已提交
932
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
933 934
  int32_t j = 0;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
935 936
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
937 938 939
      j++;
    }
  }
S
Shengliang Guan 已提交
940
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
941
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
942
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
943
      goto _error;
944
    }
M
Minghao Li 已提交
945
  }
M
Minghao Li 已提交
946

M
Minghao Li 已提交
947
  // init replicaNum, replicasId
M
Minghao Li 已提交
948
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
949
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
950
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
951
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
952
      goto _error;
953
    }
M
Minghao Li 已提交
954 955
  }

M
Minghao Li 已提交
956
  // init raft algorithm
M
Minghao Li 已提交
957
  pSyncNode->pFsm = pSyncInfo->pFsm;
958
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
959
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
960 961
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
962
  // init life cycle outside
M
Minghao Li 已提交
963

M
Minghao Li 已提交
964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
988
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
989
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
990
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
991
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
992
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
993 994
    goto _error;
  }
M
Minghao Li 已提交
995

M
Minghao Li 已提交
996
  // init TLA+ candidate vars
M
Minghao Li 已提交
997
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
998
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
999
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
1000 1001
    goto _error;
  }
M
Minghao Li 已提交
1002
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
1003
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
1004
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
1005 1006
    goto _error;
  }
M
Minghao Li 已提交
1007

M
Minghao Li 已提交
1008 1009
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
1010
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
1011
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1012 1013
    goto _error;
  }
M
Minghao Li 已提交
1014
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
1015
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
1016
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1017 1018
    goto _error;
  }
M
Minghao Li 已提交
1019 1020 1021

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
1022
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
1023
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
1024 1025
    goto _error;
  }
1026 1027 1028 1029

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
1030
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1031 1032
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
1033
      sNTrace(pSyncNode, "reset commit index by snapshot");
1034 1035 1036
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
1037

1038 1039 1040
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
    goto _error;
  }
M
Minghao Li 已提交
1041 1042
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
1043 1044
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
1045

M
Minghao Li 已提交
1046
  // init ping timer
M
Minghao Li 已提交
1047
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
1048
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
1049 1050
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
1051
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
1052
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
1053

M
Minghao Li 已提交
1054 1055
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
1056
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1057
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
1058
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
1059 1060 1061 1062
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
1063
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
1064 1065
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
1066
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
1067 1068
  pSyncNode->heartbeatTimerCounter = 0;

1069 1070 1071 1072 1073
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
1074
  // tools
M
Minghao Li 已提交
1075
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
1076
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
1077
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
1078 1079
    goto _error;
  }
M
Minghao Li 已提交
1080

1081 1082
  // restore state
  pSyncNode->restoreFinish = false;
1083

M
Minghao Li 已提交
1084
  // snapshot senders
S
Shengliang Guan 已提交
1085
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1086
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
1087
    // ASSERT(pSender != NULL);
M
Minghao Li 已提交
1088
    (pSyncNode->senders)[i] = pSender;
S
Shengliang Guan 已提交
1089
    sSDebug(pSender, "snapshot sender create new while open, data:%p", pSender);
M
Minghao Li 已提交
1090 1091 1092
  }

  // snapshot receivers
1093
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
1094

M
Minghao Li 已提交
1095 1096 1097
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
1098 1099 1100
  // replication mgr
  syncNodeLogReplMgrInit(pSyncNode);

M
Minghao Li 已提交
1101 1102 1103
  // peer state
  syncNodePeerStateInit(pSyncNode);

B
Benguang Zhao 已提交
1104
  //
M
Minghao Li 已提交
1105 1106 1107
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1108
  // start in syncNodeStart
M
Minghao Li 已提交
1109
  // start raft
M
Minghao Li 已提交
1110
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1111

M
Minghao Li 已提交
1112 1113
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1114
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1115 1116
  pSyncNode->lastReplicateTime = timeNow;

1117 1118 1119
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
1120 1121
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
1122
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1123
    goto _error;
B
Benguang Zhao 已提交
1124 1125
  }

1126
  pSyncNode->isStart = true;
1127 1128 1129
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1130 1131
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1132
  pSyncNode->tmrRoutineNum = 0;
1133

1134 1135 1136
  sNInfo(pSyncNode, "sync open, node:%p", pSyncNode);
  sTrace("vgId:%d, tsElectInterval:%d, tsHeartbeatInterval:%d, tsHeartbeatTimeout:%d", pSyncNode->vgId, tsElectInterval,
         tsHeartbeatInterval, tsHeartbeatTimeout);
1137

M
Minghao Li 已提交
1138
  return pSyncNode;
1139 1140 1141

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1142 1143
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1144 1145 1146 1147
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1148 1149
}

M
Minghao Li 已提交
1150 1151
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1152 1153
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1154 1155 1156 1157 1158 1159
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1160
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1161 1162
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1163 1164 1165 1166

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1167 1168 1169 1170 1171 1172
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex: %" PRId64 ", lastVer: %" PRId64 "",
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1173

1174
  ASSERT(endIndex == lastVer + 1);
B
Benguang Zhao 已提交
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197
  commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);

  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) {
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
  if (pSyncNode->replicaNum == 1) {
    raftStoreNextTerm(pSyncNode->pRaftStore);
    syncNodeBecomeLeader(pSyncNode, "one replica start");

    // Raft 3.6.2 Committing entries from previous terms
    syncNodeAppendNoop(pSyncNode);
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1198
  ASSERT(ret == 0);
B
Benguang Zhao 已提交
1199 1200 1201 1202
  return ret;
}

void syncNodeStartOld(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1203
  // start raft
1204
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
1205
    raftStoreNextTerm(pSyncNode->pRaftStore);
1206
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
1207

1208
    // Raft 3.6.2 Committing entries from previous terms
1209 1210
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
1211

M
Minghao Li 已提交
1212 1213
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
1214 1215
  }

1216 1217
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1218
  ASSERT(ret == 0);
M
Minghao Li 已提交
1219 1220
}

B
Benguang Zhao 已提交
1221
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1222 1223 1224 1225 1226 1227 1228
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1229
  ASSERT(ret == 0);
1230

1231 1232
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1233
  ASSERT(ret == 0);
B
Benguang Zhao 已提交
1234
  return ret;
M
Minghao Li 已提交
1235 1236
}

M
Minghao Li 已提交
1237
void syncNodePreClose(SSyncNode* pSyncNode) {
1238 1239 1240 1241
  if (pSyncNode != NULL && pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpApplyQueueItems != NULL) {
    while (1) {
      int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
      sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
1242
      if (aqItems == 0 || aqItems == -1) {
1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
        break;
      }
      taosMsleep(20);
    }
  }

  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
      snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
    }

    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

M
Minghao Li 已提交
1258 1259 1260 1261 1262 1263 1264
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

1265
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1266

M
Minghao Li 已提交
1267
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1268
  if (pSyncNode == NULL) return;
1269
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1270

S
Shengliang Guan 已提交
1271
  int32_t ret = raftStoreClose(pSyncNode->pRaftStore);
1272
  ASSERT(ret == 0);
M
Minghao Li 已提交
1273
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
1274

B
Benguang Zhao 已提交
1275
  syncNodeLogReplMgrDestroy(pSyncNode);
M
Minghao Li 已提交
1276
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1277
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1278
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1279
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1280
  votesRespondDestory(pSyncNode->pVotesRespond);
1281
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1282
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1283
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1284
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1285
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1286
  logStoreDestory(pSyncNode->pLogStore);
1287
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1288 1289
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1290
  raftCfgClose(pSyncNode->pRaftCfg);
1291
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1292 1293 1294 1295 1296

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1297 1298 1299 1300
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

S
Shengliang Guan 已提交
1301
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1302
    if ((pSyncNode->senders)[i] != NULL) {
S
Shengliang Guan 已提交
1303
      sSTrace((pSyncNode->senders)[i], "snapshot sender destroy while close, data:%p", (pSyncNode->senders)[i]);
1304 1305 1306 1307 1308

      if (snapshotSenderIsStart((pSyncNode->senders)[i])) {
        snapshotSenderStop((pSyncNode->senders)[i], false);
      }

M
Minghao Li 已提交
1309 1310 1311 1312 1313
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1314
  if (pSyncNode->pNewNodeReceiver != NULL) {
1315 1316 1317 1318
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
      snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
    }

M
Minghao Li 已提交
1319 1320 1321 1322
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1323
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1324 1325
}

M
Minghao Li 已提交
1326
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1327

M
Minghao Li 已提交
1328 1329 1330
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1331 1332
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1333 1334 1335
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1336
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1337
  }
M
Minghao Li 已提交
1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1351
  if (syncIsInit()) {
1352
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1353

1354 1355 1356 1357 1358
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1359

M
Minghao Li 已提交
1360
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1361
                 &pSyncNode->pElectTimer);
1362

1363
  } else {
M
Minghao Li 已提交
1364
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1365
  }
M
Minghao Li 已提交
1366 1367 1368 1369 1370
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1371
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1372 1373
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1374

M
Minghao Li 已提交
1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1385 1386
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1387 1388 1389 1390 1391 1392 1393
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1394
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1395

S
Shengliang Guan 已提交
1396 1397
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1398 1399 1400
  return ret;
}

M
Minghao Li 已提交
1401
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1402
  int32_t ret = 0;
S
Shengliang Guan 已提交
1403 1404
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1405 1406 1407
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1408
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1409
  }
1410

S
Shengliang Guan 已提交
1411
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1412 1413 1414
  return ret;
}

M
Minghao Li 已提交
1415
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1416
  int32_t ret = 0;
M
Minghao Li 已提交
1417

1418
#if 0
M
Minghao Li 已提交
1419
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1420 1421
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1422

S
Shengliang Guan 已提交
1423
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1424
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1425 1426 1427
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1428
  }
1429

M
Minghao Li 已提交
1430 1431 1432
  return ret;
}

M
Minghao Li 已提交
1433 1434
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1435 1436

#if 0
M
Minghao Li 已提交
1437 1438 1439
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1440
#endif
1441

S
Shengliang Guan 已提交
1442
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1443
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1444 1445 1446
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1447
  }
1448

M
Minghao Li 已提交
1449 1450 1451
  return ret;
}

1452 1453 1454 1455 1456 1457
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1458 1459 1460
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1461
  syncUtilRaftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1462
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1463 1464 1465
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1466
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1467
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1468
  } else {
M
Minghao Li 已提交
1469
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
S
Shengliang Guan 已提交
1470
    rpcFreeCont(pMsg->pCont);
M
Minghao Li 已提交
1471
    return -1;
M
Minghao Li 已提交
1472
  }
M
Minghao Li 已提交
1473

M
Minghao Li 已提交
1474 1475 1476 1477 1478
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1479
  syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1480
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1481 1482 1483
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1484
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1485
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1486
  } else {
M
Minghao Li 已提交
1487
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1488
  }
M
Minghao Li 已提交
1489 1490 1491
  return 0;
}

1492
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1493 1494 1495
  bool b1 = false;
  bool b2 = false;

S
Shengliang Guan 已提交
1496
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1497 1498 1499 1500 1501 1502 1503
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

S
Shengliang Guan 已提交
1504
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

1515
  ASSERT(b1 == b2);
1516 1517 1518
  return b1;
}

1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1532
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1533
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1534 1535 1536 1537
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1538

1539
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1540 1541
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

1542 1543
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1544 1545
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1546

M
Minghao Li 已提交
1547 1548
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1549

M
Minghao Li 已提交
1550 1551 1552 1553
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1554
  }
1555

M
Minghao Li 已提交
1556 1557 1558 1559 1560
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1561

M
Minghao Li 已提交
1562
  // log begin config change
S
Shengliang Guan 已提交
1563 1564 1565 1566
  char oldCfgStr[1024] = {0};
  char newCfgStr[1024] = {0};
  syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
  syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
1567
  sNInfo(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
M
Minghao Li 已提交
1568

M
Minghao Li 已提交
1569 1570
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1571
  }
M
Minghao Li 已提交
1572 1573
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1574 1575
  }

M
Minghao Li 已提交
1576
  // add last config index
M
Minghao Li 已提交
1577
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1578

M
Minghao Li 已提交
1579 1580 1581 1582 1583 1584 1585 1586 1587
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1588
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1589
      oldSenders[i] = (pSyncNode->senders)[i];
S
Shengliang Guan 已提交
1590
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1591
    }
1592

M
Minghao Li 已提交
1593 1594
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1595
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1596 1597 1598

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1599 1600
    int32_t j = 0;
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
1601 1602 1603 1604 1605
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
S
Shengliang Guan 已提交
1606
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1607
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1608
    }
1609

M
Minghao Li 已提交
1610 1611
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
1612
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
1613
      syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1614
    }
1615

1616 1617 1618
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1619 1620 1621 1622
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1623

M
Minghao Li 已提交
1624
    // reset snapshot senders
1625

M
Minghao Li 已提交
1626
    // clear new
S
Shengliang Guan 已提交
1627
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1628 1629
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
1630

M
Minghao Li 已提交
1631
    // reset new
S
Shengliang Guan 已提交
1632
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1633 1634
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1635
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1636
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
M
Minghao Li 已提交
1637 1638 1639
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1640
          sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
S
Shengliang Guan 已提交
1641
                  (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
M
Minghao Li 已提交
1642 1643 1644 1645 1646 1647 1648 1649 1650

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

S
Shengliang Guan 已提交
1651 1652
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
                  i, host, port, (pSyncNode->senders)[i], reset);
M
Minghao Li 已提交
1653 1654

          break;
M
Minghao Li 已提交
1655
        }
1656 1657
      }
    }
1658

M
Minghao Li 已提交
1659
    // create new
S
Shengliang Guan 已提交
1660
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1661 1662
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
S
Shengliang Guan 已提交
1663 1664 1665
        sSTrace((pSyncNode->senders)[i], "snapshot sender create new while reconfig, data:%p", (pSyncNode->senders)[i]);
      } else {
        sSTrace((pSyncNode->senders)[i], "snapshot sender already exist, data:%p", (pSyncNode->senders)[i]);
M
Minghao Li 已提交
1666
      }
1667 1668
    }

M
Minghao Li 已提交
1669
    // free old
S
Shengliang Guan 已提交
1670
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1671
      if (oldSenders[i] != NULL) {
S
Shengliang Guan 已提交
1672
        sNTrace(pSyncNode, "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1673 1674 1675
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1676 1677
    }

1678
    // persist cfg
M
Minghao Li 已提交
1679
    raftCfgPersist(pSyncNode->pRaftCfg);
1680

S
Shengliang Guan 已提交
1681
    char tmpbuf[1024] = {0};
1682
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
S
Shengliang Guan 已提交
1683
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1684

M
Minghao Li 已提交
1685 1686 1687
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
1688 1689 1690

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1691
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1692

M
Minghao Li 已提交
1693 1694 1695 1696
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
1697
    // persist cfg
M
Minghao Li 已提交
1698
    raftCfgPersist(pSyncNode->pRaftCfg);
1699 1700
    sNInfo(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s", oldConfig.replicaNum,
           pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
1701
  }
1702

M
Minghao Li 已提交
1703
_END:
M
Minghao Li 已提交
1704
  // log end config change
1705
  sNInfo(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1706 1707
}

M
Minghao Li 已提交
1708 1709 1710 1711
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1712
    char tmpBuf[64];
1713
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1714
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1715 1716 1717 1718
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1719 1720 1721 1722 1723 1724
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1725
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1726
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1727
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1728
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1729 1730
    return;
  }
M
Minghao Li 已提交
1731 1732

  do {
1733
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1734
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1735 1736 1737 1738 1739
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1740
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1741 1742 1743 1744 1745 1746 1747 1748 1749 1750
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1751 1752
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1753
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1754
  // maybe clear leader cache
M
Minghao Li 已提交
1755 1756 1757 1758
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1759 1760
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1761
  // state change
M
Minghao Li 已提交
1762 1763 1764
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1765 1766
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1767

1768 1769 1770
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1771 1772 1773 1774 1775
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1776 1777 1778
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1779 1780 1781
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1782
  // trace log
S
Shengliang Guan 已提交
1783
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1804
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1805 1806
  pSyncNode->leaderTime = taosGetTimestampMs();

1807
  pSyncNode->becomeLeaderNum++;
1808
  pSyncNode->hbrSlowNum = 0;
1809

1810 1811 1812
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1813
  // state change
M
Minghao Li 已提交
1814
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1815 1816

  // set leader cache
M
Minghao Li 已提交
1817 1818
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1819
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1820 1821
    // maybe overwrite myself, no harm
    // just do it!
1822 1823 1824 1825 1826 1827 1828

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1829
    ASSERT(code == 0);
1830
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1831 1832
  }

S
Shengliang Guan 已提交
1833
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1834 1835
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1836 1837 1838
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1839 1840 1841
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1842
#if 0
1843 1844
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1845
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1846
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1847 1848 1849
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1850
    }
1851
    (pMySender->privateTerm) += 100;
1852
  }
M
Minghao Li 已提交
1853
#endif
1854

1855
  // close receiver
M
Minghao Li 已提交
1856 1857
  if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
      snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
1858 1859 1860
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1861
  // stop elect timer
M
Minghao Li 已提交
1862
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1863

M
Minghao Li 已提交
1864 1865
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1866

M
Minghao Li 已提交
1867 1868
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1869

1870 1871 1872 1873 1874
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1875 1876 1877
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1878 1879 1880
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1881
  // trace log
1882
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1883 1884 1885
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1886 1887
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1888
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1889

S
Shengliang Guan 已提交
1890
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1891

B
Benguang Zhao 已提交
1892
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1893 1894 1895 1896
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1897
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1898
  ASSERT(lastIndex >= 0);
B
Benguang Zhao 已提交
1899 1900
  sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "",
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1901 1902 1903
}

void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) {
1904 1905
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
B
Benguang Zhao 已提交
1906 1907
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");

M
Minghao Li 已提交
1908
  // Raft 3.6.2 Committing entries from previous terms
1909 1910
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1911 1912

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1913
    syncNodeReplicate(pSyncNode);
1914
  }
M
Minghao Li 已提交
1915 1916
}

M
Minghao Li 已提交
1917 1918
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1919
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1920
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1921 1922 1923 1924 1925
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1926 1927 1928
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1929
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1930
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1931 1932 1933
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1934

S
Shengliang Guan 已提交
1935
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1936 1937 1938
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1939
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1940
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1941 1942 1943 1944
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);

S
Shengliang Guan 已提交
1945
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1946 1947 1948
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1949
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1950
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1951 1952 1953 1954
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);

S
Shengliang Guan 已提交
1955
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1956 1957
}

M
Minghao Li 已提交
1958 1959
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1960
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1961 1962
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1963 1964 1965 1966

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1967
// simulate get vote from outside
M
Minghao Li 已提交
1968
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1969
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1970

S
Shengliang Guan 已提交
1971 1972
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1973
  if (ret != 0) return;
M
Minghao Li 已提交
1974

S
Shengliang Guan 已提交
1975
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1976 1977 1978 1979 1980 1981 1982
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1983
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1984 1985
}

M
Minghao Li 已提交
1986
// return if has a snapshot
M
Minghao Li 已提交
1987 1988
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1989
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1990 1991
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1992 1993 1994 1995 1996 1997 1998
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1999 2000
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
2001
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2002
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2003 2004
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2005 2006 2007 2008 2009 2010 2011
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
2012 2013
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
2014 2015
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
2016 2017
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
2018
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2019 2020
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2021 2022
    }

M
Minghao Li 已提交
2023 2024 2025
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
2026 2027 2028 2029
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
2030
  } else {
M
Minghao Li 已提交
2031 2032
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
2033
  }
M
Minghao Li 已提交
2034

M
Minghao Li 已提交
2035 2036 2037 2038 2039 2040 2041
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
2042 2043
  return 0;
}
M
Minghao Li 已提交
2044

M
Minghao Li 已提交
2045
// return append-entries first try index
M
Minghao Li 已提交
2046 2047 2048 2049 2050
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
2051 2052
// if index > 0, return index - 1
// else, return -1
2053 2054 2055 2056 2057 2058 2059 2060 2061
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
2062 2063 2064 2065
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
2066 2067 2068 2069 2070 2071 2072 2073 2074
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

2075 2076 2077
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

2078
  SSyncRaftEntry* pPreEntry = NULL;
2079 2080 2081 2082 2083 2084 2085
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

2086
    pSyncNode->pLogStore->cacheHit++;
2087 2088 2089
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
2090
    pSyncNode->pLogStore->cacheMiss++;
2091 2092 2093 2094
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
2095 2096 2097 2098 2099 2100

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2101
  if (code == 0) {
2102
    ASSERT(pPreEntry != NULL);
2103
    preTerm = pPreEntry->term;
2104 2105 2106 2107

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
2108
      syncEntryDestroy(pPreEntry);
2109 2110
    }

2111 2112
    return preTerm;
  } else {
2113 2114 2115 2116
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2117 2118 2119 2120
      }
    }
  }

2121
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
2122
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2123 2124
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2125 2126 2127 2128

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2129
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2130 2131 2132
  return 0;
}

M
Minghao Li 已提交
2133
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2134
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2135

S
Shengliang Guan 已提交
2136 2137 2138
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2139
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
2140 2141
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
2142
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
2143 2144
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
2145
    }
M
Minghao Li 已提交
2146

M
Minghao Li 已提交
2147
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
2148 2149
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
2150
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
2151 2152
      rpcFreeCont(rpcMsg.pCont);
      return;
2153
    }
M
Minghao Li 已提交
2154

S
Shengliang Guan 已提交
2155
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
2156
  }
M
Minghao Li 已提交
2157 2158
}

M
Minghao Li 已提交
2159
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2160
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2161

M
Minghao Li 已提交
2162 2163
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2164

2165
  if (pNode == NULL) return;
M
Minghao Li 已提交
2166 2167 2168 2169 2170

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2171

2172
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2173 2174 2175 2176
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2177

S
Shengliang Guan 已提交
2178
  SRpcMsg rpcMsg = {0};
2179 2180
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2181

S
Shengliang Guan 已提交
2182
  if (code != 0) {
M
Minghao Li 已提交
2183
    sError("failed to build elect msg");
M
Minghao Li 已提交
2184
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2185
    return;
M
Minghao Li 已提交
2186 2187
  }

S
Shengliang Guan 已提交
2188
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2189
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2190 2191 2192

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2193
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2194
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2195
    syncNodeRelease(pNode);
2196
    return;
M
Minghao Li 已提交
2197
  }
M
Minghao Li 已提交
2198 2199

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2200 2201
}

M
Minghao Li 已提交
2202
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2203
  if (!syncIsInit()) return;
2204

S
Shengliang Guan 已提交
2205 2206 2207 2208
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2209
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2210 2211 2212
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2213
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2214
        return;
2215
      }
M
Minghao Li 已提交
2216

2217
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2218 2219
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2220
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2221 2222
        rpcFreeCont(rpcMsg.pCont);
        return;
2223
      }
S
Shengliang Guan 已提交
2224 2225 2226 2227

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2228
    } else {
S
Shengliang Guan 已提交
2229 2230
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2231
    }
M
Minghao Li 已提交
2232 2233 2234
  }
}

2235
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2236
  int64_t hbDataRid = (int64_t)param;
2237
  int64_t tsNow = taosGetTimestampMs();
2238

2239 2240
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
M
Minghao Li 已提交
2241
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2242 2243
    return;
  }
2244

2245
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2246
  if (pSyncNode == NULL) {
2247
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2248
    sError("hb timer get pSyncNode NULL");
2249 2250 2251 2252 2253 2254 2255 2256
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2257
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2258 2259 2260
    return;
  }

M
Minghao Li 已提交
2261
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2262 2263
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2264
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2265 2266 2267
    return;
  }

M
Minghao Li 已提交
2268
  if (pSyncNode->pRaftStore == NULL) {
2269 2270
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2271
    sError("vgId:%d, hb timer raft store already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2272 2273 2274
    return;
  }

M
Minghao Li 已提交
2275
  // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
2276 2277

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2278 2279 2280
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2281
    if (timerLogicClock == msgLogicClock) {
2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301
      if (tsNow > pData->execTime) {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
            "---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif

        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
        pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
        pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
        pSyncMsg->privateTerm = 0;
2302
        pSyncMsg->timeStamp = tsNow;
2303 2304 2305 2306 2307 2308

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
2309 2310
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2311 2312 2313 2314 2315 2316 2317 2318
      } else {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
      }

M
Minghao Li 已提交
2319 2320
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2321 2322
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2323 2324 2325 2326
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2327
    } else {
M
Minghao Li 已提交
2328 2329
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2330 2331
    }
  }
2332 2333 2334

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2335 2336
}

2337 2338 2339 2340 2341
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
2342

2343 2344 2345 2346
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
2347

S
Shengliang Guan 已提交
2348
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2349
  int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
2350
  syncEntryDestroy(pEntry);
M
Minghao Li 已提交
2351

2352 2353 2354
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2355
    sError("failed to propose noop msg while enqueue since %s", terrstr());
2356
  }
M
Minghao Li 已提交
2357

2358
  return code;
M
Minghao Li 已提交
2359 2360
}

2361 2362
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2363 2364 2365 2366
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2367 2368
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2369 2370 2371 2372 2373 2374 2375 2376 2377
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2378 2379 2380
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2381
    sError("vgId:%d, failed to enqueue sync log buffer. index:%" PRId64 "", ths->vgId, pEntry->index);
B
Benguang Zhao 已提交
2382 2383 2384 2385
    return -1;
  }

  // proceed match index, with replicating on needed
2386
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2387

2388
  sTrace("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2389 2390 2391
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2392

B
Benguang Zhao 已提交
2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2409
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421
  if (pSyncNode->replicaNum == 1) {
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2422
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2423 2424 2425 2426 2427 2428 2429 2430 2431
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2451
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2452 2453 2454 2455 2456 2457 2458 2459 2460
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
  SyncTerm  term = ths->pRaftStore->currentTerm;

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2461 2462
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2463 2464 2465
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2466 2467
  int32_t ret = 0;

2468
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2469
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2470
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2471
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2472

2473 2474
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2475
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2476
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2477
    if (code != 0) {
M
Minghao Li 已提交
2478
      sError("append noop error");
2479 2480
      return -1;
    }
2481 2482

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2483 2484
  }

2485 2486 2487
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2488
    syncEntryDestroy(pEntry);
2489 2490
  }

M
Minghao Li 已提交
2491 2492 2493
  return ret;
}

S
Shengliang Guan 已提交
2494 2495
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2496

M
Minghao Li 已提交
2497 2498 2499 2500
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2501
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2502
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2503
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2504

2505 2506 2507 2508
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2509 2510 2511 2512
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number
2513
  pMsgReply->startTime = ths->startTime;
2514
  pMsgReply->timeStamp = tsMs;
2515

M
Minghao Li 已提交
2516
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2517 2518
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);

2519
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2520
    ths->minMatchIndex = pMsg->minMatchIndex;
2521 2522

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2523
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
S
Shengliang Guan 已提交
2524 2525 2526 2527
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2528 2529
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;
2530
      SyncIndex fcIndex = pSyncMsg->fcIndex;
2531 2532 2533 2534 2535 2536 2537

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2538
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2539 2540
        }
      }
2541 2542 2543
    }
  }

M
Minghao Li 已提交
2544
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2545
    // syncNodeStepDown(ths, pMsg->term);
S
Shengliang Guan 已提交
2546 2547 2548 2549
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2550 2551 2552
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

S
Shengliang Guan 已提交
2553 2554
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2555 2556 2557 2558
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2559
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2560
      }
2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575
    }
  }

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
  return 0;
}

2576
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2577 2578 2579 2580
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2581
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2582
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2583 2584 2585 2586
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2587 2588

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2589
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2590

2591 2592
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2593 2594 2595
  return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg);
}

2596
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2597
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2598

M
Minghao Li 已提交
2599 2600 2601 2602
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2603
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2604
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2605
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2606

2607
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2608
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2609 2610 2611
  return 0;
}

S
Shengliang Guan 已提交
2612 2613
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2614 2615
  syncLogRecvLocalCmd(ths, pMsg, "");

2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    (void)syncNodeUpdateCommitIndex(ths, pMsg->fcIndex);
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
      sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(),
             ths->commitIndex);
    }
  } else {
    sError("error local cmd");
  }

  return 0;
}

int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2636 2637 2638
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2639 2640 2641
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2642
  } else {
M
Minghao Li 已提交
2643
    sError("error local cmd");
M
Minghao Li 已提交
2644
  }
2645 2646 2647 2648

  return 0;
}

M
Minghao Li 已提交
2649 2650 2651 2652 2653 2654 2655 2656 2657 2658
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2659

2660
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2661
  sNTrace(ths, "on client request");
2662

B
Benguang Zhao 已提交
2663 2664
  int32_t code = 0;

B
Benguang Zhao 已提交
2665 2666 2667
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = NULL;
2668 2669 2670 2671
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2672 2673 2674 2675 2676 2677 2678
  }

  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2679 2680
    int32_t code = syncNodeAppend(ths, pEntry);
    if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) {
S
Shengliang Guan 已提交
2681
      ASSERTS(false, "failed to append blocking msg");
2682 2683
    }
    return code;
B
Benguang Zhao 已提交
2684 2685
  }

B
Benguang Zhao 已提交
2686
  return -1;
B
Benguang Zhao 已提交
2687 2688
}

2689 2690
int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
  sNTrace(ths, "on client request");
B
Benguang Zhao 已提交
2691

M
Minghao Li 已提交
2692
  int32_t ret = 0;
2693
  int32_t code = 0;
M
Minghao Li 已提交
2694

M
Minghao Li 已提交
2695
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2696
  SyncTerm        term = ths->pRaftStore->currentTerm;
2697 2698 2699 2700 2701 2702 2703
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2704

2705 2706
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2707
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2708 2709 2710
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2711 2712 2713 2714
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
2715
          syncEntryDestroy(pEntry);
2716
        }
2717

2718 2719 2720 2721
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2733
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2734 2735 2736 2737

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
2738
          syncEntryDestroy(pEntry);
2739 2740
        }

2741 2742
        return -1;
      }
2743
    }
M
Minghao Li 已提交
2744

2745 2746
    syncCacheEntry(ths->pLogStore, pEntry, &h);

2747 2748
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2749
      syncNodeReplicate(ths);
2750
    }
2751

2752 2753
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2754 2755 2756 2757 2758
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2759
    }
M
Minghao Li 已提交
2760 2761
  }

2762 2763 2764 2765 2766 2767 2768 2769
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2770 2771 2772
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2773
    syncEntryDestroy(pEntry);
2774 2775
  }

M
Minghao Li 已提交
2776
  return ret;
2777
}
M
Minghao Li 已提交
2778

S
Shengliang Guan 已提交
2779 2780 2781
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2782
      return "follower";
S
Shengliang Guan 已提交
2783
    case TAOS_SYNC_STATE_CANDIDATE:
2784
      return "candidate";
S
Shengliang Guan 已提交
2785
    case TAOS_SYNC_STATE_LEADER:
2786
      return "leader";
S
Shengliang Guan 已提交
2787
    case TAOS_SYNC_STATE_ERROR:
2788
      return "error";
S
Shengliang Guan 已提交
2789 2790 2791 2792
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
    default:
      return "unknown";
S
Shengliang Guan 已提交
2793
  }
M
Minghao Li 已提交
2794
}
2795

2796
#if 0
2797
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2798
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2799
    sNTrace(ths, "I am not follower, can not do leader transfer");
2800 2801
    return 0;
  }
2802 2803

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2804
    sNTrace(ths, "restore not finish, can not do leader transfer");
2805 2806 2807
    return 0;
  }

2808
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2809
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2810 2811 2812 2813
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2814
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2815 2816 2817
    return 0;
  }

2818 2819
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2820
      sNTrace(ths, "I am vnode, can not do leader transfer");
2821 2822 2823 2824
      return 0;
    }
  */

2825
  SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
S
Shengliang Guan 已提交
2826
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2827

M
Minghao Li 已提交
2828 2829 2830
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2831

M
Minghao Li 已提交
2832 2833
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2834 2835 2836
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
2837
    ASSERT(ret == 0);
M
Minghao Li 已提交
2838

2839
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2840
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2841 2842
  }

M
Minghao Li 已提交
2843
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2844
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2845 2846 2847 2848 2849 2850 2851 2852 2853
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2854 2855
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2856 2857
  }

2858 2859 2860
  return 0;
}

2861 2862
#endif

2863
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2864
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2878 2879 2880 2881
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2882
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2883
  ASSERT(false);
2884 2885 2886 2887
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2888 2889 2890 2891 2892 2893 2894 2895 2896
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2897
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2898

M
Minghao Li 已提交
2899 2900 2901
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2902 2903
  }

2904 2905
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2906

S
Shengliang Guan 已提交
2907
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2908 2909 2910 2911 2912 2913

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2914 2915 2916 2917
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2918

2919
          ths->pLogStore->cacheHit++;
2920 2921
          sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry);

2922
        } else {
2923
          ths->pLogStore->cacheMiss++;
2924 2925
          sNTrace(ths, "miss cache index:%" PRId64, i);

2926
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
2927 2928
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2929
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2930
            sNError(ths, "get log entry error");
2931
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2932 2933
            continue;
          }
2934
        }
2935

2936
        SRpcMsg rpcMsg = {0};
2937 2938
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2939
        sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType));
M
Minghao Li 已提交
2940

2941
        // user commit
2942 2943
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2944
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2945 2946 2947
            internalExecute = false;
          }

M
Minghao Li 已提交
2948 2949
          sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute,
                  TMSG_INFO(pEntry->msgType));
2950

2951 2952
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2965
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2966
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
M
Minghao Li 已提交
2967
          }
2968
        }
2969

2970 2971
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2972
        // leader transfer
2973 2974
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
2975
          ASSERT(code == 0);
2976
        }
2977
#endif
2978 2979

        // restore finish
2980
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
2981 2982 2983 2984 2985 2986
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
2987

2988
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
2989
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
2990 2991 2992 2993
          }
        }

        rpcFreeCont(rpcMsg.pCont);
2994 2995 2996
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
B
Benguang Zhao 已提交
2997
          syncEntryDestroy(pEntry);
2998
        }
2999 3000 3001 3002
      }
    }
  }
  return 0;
3003 3004 3005
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
3006
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
3007 3008 3009 3010 3011
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
3012 3013 3014 3015
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
3016
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
3017 3018 3019 3020 3021
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
3022
}
M
Minghao Li 已提交
3023

3024 3025
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
3026
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
3027 3028 3029 3030 3031 3032 3033
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
3034 3035
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
3036
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
3037 3038 3039 3040 3041 3042 3043 3044 3045
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
3046
  if (pState == NULL) {
3047
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
3048 3049
    return false;
  }
M
Minghao Li 已提交
3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
3075
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
3076
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
3077
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
3078 3079 3080 3081 3082 3083
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
3084
}