syncMain.c 87.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
M
Minghao Li 已提交
40

M
Minghao Li 已提交
41 42 43 44
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
45
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
46
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
47 48 49
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
50 51 52 53 54 55 56 57 58 59 60
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
61

62
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
63
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
64
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
65
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
66 67
    return -1;
  }
M
Minghao Li 已提交
68

S
Shengliang Guan 已提交
69
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
70
  if (pSyncNode->rid < 0) {
71
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
72 73 74
    return -1;
  }

S
Shengliang Guan 已提交
75 76 77 78 79 80
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
81
  return pSyncNode->rid;
M
Minghao Li 已提交
82
}
M
Minghao Li 已提交
83

B
Benguang Zhao 已提交
84
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
85
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
86
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
87
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
B
Benguang Zhao 已提交
88 89 90 91
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
92
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
93
    goto _err;
M
Minghao Li 已提交
94
  }
M
Minghao Li 已提交
95

B
Benguang Zhao 已提交
96 97 98 99
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
100

B
Benguang Zhao 已提交
101 102
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
103

104 105 106
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
107 108
}

M
Minghao Li 已提交
109
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
110
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
111
  if (pSyncNode != NULL) {
112
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
113
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
114
    syncNodeRemove(rid);
M
Minghao Li 已提交
115 116 117
  }
}

M
Minghao Li 已提交
118 119
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
120 121 122
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
123 124 125
  }
}

126 127 128 129 130 131 132 133
void syncPostStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodePostClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
}

S
Shengliang Guan 已提交
134 135 136
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
137 138
}

S
Shengliang Guan 已提交
139
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
140
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
141
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
142

D
dapan1121 已提交
143
  if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) {
C
cadem 已提交
144 145
    syncNodeRelease(pSyncNode);
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
D
dapan1121 已提交
146
          pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
C
cadem 已提交
147 148 149
    return 0;
  }

M
Minghao Li 已提交
150
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
151
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
152
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
153
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
154
    return -1;
M
Minghao Li 已提交
155
  }
156

S
Shengliang Guan 已提交
157
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
C
cadem 已提交
158
  syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex);
S
Shengliang Guan 已提交
159

M
Minghao Li 已提交
160 161 162
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

C
cadem 已提交
163
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
S
Shengliang Guan 已提交
164
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
165 166 167
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
168
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
169
  }
S
Shengliang Guan 已提交
170

S
Shengliang Guan 已提交
171
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
172
  return 0;
M
Minghao Li 已提交
173
}
M
Minghao Li 已提交
174

S
Shengliang Guan 已提交
175 176 177 178
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
179
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
180 181
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
182 183 184 185 186 187 188 189 190 191
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
192 193 194
    case TDMT_SYNC_TIMEOUT_ELECTION:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
214
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
215 216 217 218
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
C
cadem 已提交
219 220 221
    case TDMT_SYNC_FORCE_FOLLOWER:
      code = syncForceBecomeFollower(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
222
    default:
223
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
224
      code = -1;
M
Minghao Li 已提交
225 226
  }

S
Shengliang Guan 已提交
227
  syncNodeRelease(pSyncNode);
228 229 230 231
  if (code != 0) {
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
           terrno);
  }
S
Shengliang Guan 已提交
232
  return code;
233 234
}

S
Shengliang Guan 已提交
235
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
236
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
237
  if (pSyncNode == NULL) return -1;
238

S
Shengliang Guan 已提交
239
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
240
  syncNodeRelease(pSyncNode);
241 242 243
  return ret;
}

C
cadem 已提交
244 245
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  syncNodeBecomeFollower(ths, "force election");
C
cadem 已提交
246

C
cadem 已提交
247 248 249 250 251 252 253 254 255
  SRpcMsg rsp = {
      .code = 0,
      .pCont = pRpcMsg->info.rsp,
      .contLen = pRpcMsg->info.rspLen,
      .info = pRpcMsg->info,
  };
  tmsgSendRsp(&rsp);

  return 0;
C
cadem 已提交
256 257
}

258
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
259
  SSyncNode* pNode = syncNodeAcquire(rid);
260
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
261 262

  SRpcMsg rpcMsg = {0};
263
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
264 265 266
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
267
  if (ret == 1) {
268
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
269
    rpcSendResponse(&rpcMsg);
270 271
    return 0;
  } else {
272
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
273
    return -1;
274
  }
S
Shengliang Guan 已提交
275 276
}

M
Minghao Li 已提交
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

293
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
294
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
295
  if (pSyncNode == NULL) {
296
    sError("sync begin snapshot error");
297 298
    return -1;
  }
299

300 301 302 303 304 305 306 307 308 309
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
    syncNodeRelease(pSyncNode);
    return 0;
  }

310
  int32_t code = 0;
311
  int64_t logRetention = 0;
312

M
Minghao Li 已提交
313
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
314
    // mnode
315
    logRetention = tsMndLogRetention;
M
Minghao Li 已提交
316 317 318 319
  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas
320 321 322
      logRetention = SYNC_VNODE_LOG_RETENTION;
    }
  }
M
Minghao Li 已提交
323

C
cadem 已提交
324
  if (pSyncNode->totalReplicaNum > 1) {
D
dapan1121 已提交
325 326
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER &&
        pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
327 328 329 330
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
              lastApplyIndex);
      syncNodeRelease(pSyncNode);
      return 0;
331
    }
332
    logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
333 334
  }

M
Minghao Li 已提交
335
_DEL_WAL:
336

M
Minghao Li 已提交
337
  do {
338 339 340 341 342 343 344 345 346 347 348
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

349
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
350 351 352 353 354 355 356 357
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
358

M
Minghao Li 已提交
359
      } else {
360 361
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
362
      }
363
    }
M
Minghao Li 已提交
364
  } while (0);
365

S
Shengliang Guan 已提交
366
  syncNodeRelease(pSyncNode);
367 368 369 370
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
371
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
372
  if (pSyncNode == NULL) {
373
    sError("sync end snapshot error");
374 375 376
    return -1;
  }

377 378 379 380
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
381
    if (code != 0) {
382
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
383
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
384 385
      return -1;
    } else {
S
Shengliang Guan 已提交
386
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
387 388
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
389
  }
390

S
Shengliang Guan 已提交
391
  syncNodeRelease(pSyncNode);
392 393 394
  return code;
}

M
Minghao Li 已提交
395
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
396
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
397
  if (pSyncNode == NULL) {
398
    sError("sync step down error");
M
Minghao Li 已提交
399 400 401
    return -1;
  }

M
Minghao Li 已提交
402
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
403
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
404
  return 0;
M
Minghao Li 已提交
405 406
}

407
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
408
  if (pSyncNode == NULL) {
409
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
410
    sError("sync ready for read error");
411 412
    return false;
  }
M
Minghao Li 已提交
413

414 415 416 417 418
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

419
  if (!pSyncNode->restoreFinish) {
420
    terrno = TSDB_CODE_SYN_RESTORING;
421
    return false;
422
  }
423

424
  return true;
425 426 427 428 429 430 431 432 433 434 435
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

436 437
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
438
}
M
Minghao Li 已提交
439

440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
462 463
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
464
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
465
    return 0;
M
Minghao Li 已提交
466
  }
M
Minghao Li 已提交
467

468
  int32_t ret = 0;
469
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
470
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
471 472 473 474 475 476 477
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
478 479 480
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
481
  return ret;
M
Minghao Li 已提交
482 483
}

M
Minghao Li 已提交
484 485
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
486
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
487
    return -1;
M
Minghao Li 已提交
488
  }
489

S
Shengliang Guan 已提交
490
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
491

492 493 494 495
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
496
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
M
Minghao Li 已提交
497 498 499
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
500
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
501 502
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
503 504
}

505 506
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
507

S
Shengliang Guan 已提交
508
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
509 510 511
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
512 513 514 515 516
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
517
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
518 519
  }

520
  return state;
M
Minghao Li 已提交
521 522
}

523
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
524 525
  ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
526

527 528 529 530
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
531 532
    }
  }
S
Shengliang Guan 已提交
533
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
534
         snapshotLastApplyIndex, lastIndex);
535 536 537 538

  return lastIndex;
}

539 540
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
541

S
Shengliang Guan 已提交
542
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
543
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
544

C
cadem 已提交
545
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
D
dapan1121 已提交
546
    if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
S
Shengliang Guan 已提交
547
    SEp* pEp = &pEpSet->eps[i];
548 549
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
S
Shengliang Guan 已提交
550
    pEpSet->numOfEps++;
551
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
552
  }
M
Minghao Li 已提交
553
  if (pEpSet->numOfEps > 0) {
554
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
555 556
  }

S
Shengliang Guan 已提交
557
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
558
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
559 560
}

S
Shengliang Guan 已提交
561
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
562
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
563
  if (pSyncNode == NULL) {
564
    sError("sync propose error");
M
Minghao Li 已提交
565
    return -1;
566
  }
567

S
Shengliang Guan 已提交
568
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
569
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
570 571
  return ret;
}
M
Minghao Li 已提交
572

C
cadem 已提交
573 574 575 576 577 578 579
int32_t syncIsCatchUp(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync Node Acquire error since %d", errno);
    return -1;
  }

C
cadem 已提交
580
  int32_t isCatchUp = 0;
D
dapan1121 已提交
581
  if (pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
C
cadem 已提交
582
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
D
dapan1121 已提交
583 584 585 586
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP) {
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
          pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
          pSyncNode->pLogBuf->matchIndex);
C
cadem 已提交
587
    isCatchUp = 0;
D
dapan1121 已提交
588 589 590
  } else {
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, pSyncNode->vgId,
          pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->matchIndex);
C
cadem 已提交
591
    isCatchUp = 1;
C
cadem 已提交
592
  }
D
dapan1121 已提交
593

C
cadem 已提交
594
  syncNodeRelease(pSyncNode);
C
cadem 已提交
595 596 597 598 599 600 601 602 603 604 605
  return isCatchUp;
}

ESyncRole syncGetRole(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync Node Acquire error since %d", errno);
    return -1;
  }

  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
D
dapan1121 已提交
606

C
cadem 已提交
607 608
  syncNodeRelease(pSyncNode);
  return role;
C
cadem 已提交
609 610
}

S
Shengliang Guan 已提交
611
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
612 613
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
614
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
615 616
    return -1;
  }
617

B
Benguang Zhao 已提交
618
  if (!pSyncNode->restoreFinish) {
S
Shengliang Guan 已提交
619 620 621 622 623
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
624

625
  // heartbeat timeout
626
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
627 628 629 630 631 632
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
633 634 635
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
636
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
637 638
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
639
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
640 641 642
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
643
    } else {
S
Shengliang Guan 已提交
644
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
645
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
646
             TMSG_INFO(pMsg->msgType));
647
      return -1;
648
    }
S
Shengliang Guan 已提交
649
  } else {
S
Shengliang Guan 已提交
650 651
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
652
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
653
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
654 655 656 657
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
658
    }
659

660 661 662
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
663
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
664
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
665
    }
M
Minghao Li 已提交
666

S
Shengliang Guan 已提交
667
    if (seq != NULL) *seq = seqNum;
668
    return code;
M
Minghao Li 已提交
669
  }
M
Minghao Li 已提交
670 671
}

S
Shengliang Guan 已提交
672
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
673 674 675 676 677
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
678
  pSyncTimer->timeStamp = taosGetTimestampMs();
679 680 681 682
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
683
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
684
  int32_t ret = 0;
S
Shengliang Guan 已提交
685
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
686
  if (syncIsInit()) {
687 688 689 690 691 692
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
693
    pSyncTimer->timeStamp = tsNow;
694 695

    pData->syncNodeRid = pSyncNode->rid;
696 697 698
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
699
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
700

C
cadem 已提交
701 702
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr);

703 704
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
705 706 707 708 709 710
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
711
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
712 713 714 715
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
716 717
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
718 719 720
  return ret;
}

721
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
722 723 724
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
725 726 727
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

728 729 730 731 732
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
S
Shengliang Guan 已提交
733
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
734 735 736 737 738 739 740
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
741
// open/close --------------
S
Shengliang Guan 已提交
742 743
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
744 745 746 747
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
748

M
Minghao Li 已提交
749 750 751 752
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
753
      goto _error;
M
Minghao Li 已提交
754
    }
755
  }
M
Minghao Li 已提交
756

757 758 759
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
S
Shengliang Guan 已提交
760
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
761

762
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
763
    // create a new raft config file
764
    sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
765 766
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
C
cadem 已提交
767
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
768 769 770 771 772 773 774
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
    pSyncNode->raftCfg.configIndexCount = 1;
    pSyncNode->raftCfg.configIndexArr[0] = -1;

    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
775
      goto _error;
776
    }
777 778
  } else {
    // update syncCfg by raft_config.json
779 780
    if (syncReadCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
781
      goto _error;
782
    }
S
Shengliang Guan 已提交
783

C
cadem 已提交
784
    if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
785
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
786 787 788
      pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
      if (syncWriteCfgFile(pSyncNode) != 0) {
        sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
S
Shengliang Guan 已提交
789 790
        goto _error;
      }
S
Shengliang Guan 已提交
791
    } else {
792 793
      sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
794
    }
M
Minghao Li 已提交
795 796
  }

M
Minghao Li 已提交
797
  // init by SSyncInfo
M
Minghao Li 已提交
798
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
799
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
800
  bool      updated = false;
D
dapan1121 已提交
801 802
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", pSyncNode->vgId,
        pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
C
cadem 已提交
803
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
S
Shengliang Guan 已提交
804
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
805 806 807
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
      updated = true;
    }
808 809
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
810 811
  }

812 813 814 815 816 817 818 819
  if (updated) {
    sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
      goto _error;
    }
  }

M
Minghao Li 已提交
820
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
821
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
822 823 824
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
825

B
Benguang Zhao 已提交
826 827 828
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
829
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
830 831 832
    goto _error;
  }

M
Minghao Li 已提交
833
  // init internal
834
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
835
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
836
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
837
    goto _error;
838
  }
M
Minghao Li 已提交
839

M
Minghao Li 已提交
840
  // init peersNum, peers, peersId
C
cadem 已提交
841
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
S
Shengliang Guan 已提交
842
  int32_t j = 0;
C
cadem 已提交
843
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
844 845 846
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
847 848 849
      j++;
    }
  }
S
Shengliang Guan 已提交
850
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
851
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
852
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
853
      goto _error;
854
    }
M
Minghao Li 已提交
855
  }
M
Minghao Li 已提交
856

M
Minghao Li 已提交
857
  // init replicaNum, replicasId
858
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
C
cadem 已提交
859 860
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
861
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
862
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
863
      goto _error;
864
    }
M
Minghao Li 已提交
865 866
  }

M
Minghao Li 已提交
867
  // init raft algorithm
M
Minghao Li 已提交
868
  pSyncNode->pFsm = pSyncInfo->pFsm;
869
  pSyncInfo->pFsm = NULL;
870
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
M
Minghao Li 已提交
871 872
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
873
  // init life cycle outside
M
Minghao Li 已提交
874

M
Minghao Li 已提交
875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
899
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
900
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
901
  if (raftStoreOpen(pSyncNode) != 0) {
S
Shengliang Guan 已提交
902
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
903 904
    goto _error;
  }
M
Minghao Li 已提交
905

M
Minghao Li 已提交
906
  // init TLA+ candidate vars
M
Minghao Li 已提交
907
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
908
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
909
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
910 911
    goto _error;
  }
M
Minghao Li 已提交
912
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
913
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
914
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
915 916
    goto _error;
  }
M
Minghao Li 已提交
917

M
Minghao Li 已提交
918 919
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
920
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
921
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
922 923
    goto _error;
  }
M
Minghao Li 已提交
924
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
925
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
926
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
927 928
    goto _error;
  }
M
Minghao Li 已提交
929 930 931

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
932
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
933
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
934 935
    goto _error;
  }
936 937 938 939

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
940
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
941 942
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
943
      sNTrace(pSyncNode, "reset commit index by snapshot");
944 945 946
    }
  }
  pSyncNode->commitIndex = commitIndex;
947
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
M
Minghao Li 已提交
948

949
  // restore log store on need
950
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
951
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
952 953
    goto _error;
  }
954

M
Minghao Li 已提交
955 956
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
957 958
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
959

M
Minghao Li 已提交
960
  // init ping timer
M
Minghao Li 已提交
961
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
962
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
963 964
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
965
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
966
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
967

M
Minghao Li 已提交
968 969
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
970
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
971
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
972
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
973 974 975 976
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
977
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
978 979
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
980
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
981 982
  pSyncNode->heartbeatTimerCounter = 0;

983
  // init peer heartbeat timer
C
cadem 已提交
984
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
985 986 987
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
988
  // tools
M
Minghao Li 已提交
989
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
990
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
991
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
992 993
    goto _error;
  }
M
Minghao Li 已提交
994

995 996
  // restore state
  pSyncNode->restoreFinish = false;
997

M
Minghao Li 已提交
998
  // snapshot senders
C
cadem 已提交
999
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1000
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
1001 1002 1003 1004
    if (pSender == NULL) return NULL;

    pSyncNode->senders[i] = pSender;
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
M
Minghao Li 已提交
1005 1006 1007
  }

  // snapshot receivers
1008
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
1009 1010 1011
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
          pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1012

M
Minghao Li 已提交
1013 1014 1015
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
1016
  // replication mgr
1017
  if (syncNodeLogReplInit(pSyncNode) < 0) {
1018 1019 1020
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
B
Benguang Zhao 已提交
1021

M
Minghao Li 已提交
1022
  // peer state
1023 1024 1025 1026
  if (syncNodePeerStateInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
M
Minghao Li 已提交
1027

B
Benguang Zhao 已提交
1028
  //
M
Minghao Li 已提交
1029 1030 1031
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1032
  // start in syncNodeStart
M
Minghao Li 已提交
1033
  // start raft
M
Minghao Li 已提交
1034
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1035

M
Minghao Li 已提交
1036 1037
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1038
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1039 1040
  pSyncNode->lastReplicateTime = timeNow;

1041 1042 1043
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
1044 1045
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
1046
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1047
    goto _error;
B
Benguang Zhao 已提交
1048 1049
  }

1050
  pSyncNode->isStart = true;
1051 1052 1053
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1054 1055
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1056
  pSyncNode->tmrRoutineNum = 0;
1057

1058 1059
  sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
M
Minghao Li 已提交
1060
  return pSyncNode;
1061 1062 1063

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1064 1065
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1066 1067 1068 1069
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1070 1071
}

M
Minghao Li 已提交
1072 1073
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1074 1075
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1076 1077 1078 1079 1080 1081
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1082
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1083 1084
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1085 1086 1087 1088

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1089 1090
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
S
Shengliang Guan 已提交
1091
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
1092 1093 1094
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1095

1096
  ASSERT(endIndex == lastVer + 1);
1097 1098
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
B
Benguang Zhao 已提交
1099

1100
  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
B
Benguang Zhao 已提交
1101 1102 1103 1104 1105 1106 1107 1108
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
D
dapan1121 已提交
1109
  if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
C
cadem 已提交
1110
    syncNodeBecomeLearner(pSyncNode, "first start");
D
dapan1121 已提交
1111
  } else {
C
cadem 已提交
1112 1113 1114
    if (pSyncNode->replicaNum == 1) {
      raftStoreNextTerm(pSyncNode);
      syncNodeBecomeLeader(pSyncNode, "one replica start");
B
Benguang Zhao 已提交
1115

C
cadem 已提交
1116 1117 1118 1119
      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
    } else {
      syncNodeBecomeFollower(pSyncNode, "first start");
D
dapan1121 已提交
1120
    }
B
Benguang Zhao 已提交
1121 1122 1123 1124
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1125 1126
  if (ret != 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1127
  }
1128
  return ret;
M
Minghao Li 已提交
1129 1130
}

B
Benguang Zhao 已提交
1131
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1132 1133 1134 1135 1136 1137 1138
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1139 1140 1141 1142
  if (ret < 0) {
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
1143

1144
  ret = syncNodeStartPingTimer(pSyncNode);
1145 1146 1147 1148
  if (ret < 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
B
Benguang Zhao 已提交
1149
  return ret;
M
Minghao Li 已提交
1150 1151
}

M
Minghao Li 已提交
1152
void syncNodePreClose(SSyncNode* pSyncNode) {
1153 1154 1155 1156
  ASSERT(pSyncNode != NULL);
  ASSERT(pSyncNode->pFsm != NULL);
  ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);

M
Minghao Li 已提交
1157 1158 1159 1160 1161
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
1162

1163 1164 1165
  // stop ping timer
  syncNodeStopPingTimer(pSyncNode);

1166 1167
  // clean rsp
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
1168 1169
}

1170 1171 1172
void syncNodePostClose(SSyncNode* pSyncNode) {
  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1173
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1174 1175 1176 1177 1178 1179 1180
    }

    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }
M
Minghao Li 已提交
1181 1182
}

1183
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1184

M
Minghao Li 已提交
1185
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1186
  if (pSyncNode == NULL) return;
1187
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1188

1189 1190
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);

1191 1192 1193
  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);
1194
  syncNodeLogReplDestroy(pSyncNode);
1195

M
Minghao Li 已提交
1196
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1197
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1198
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1199
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1200
  votesRespondDestory(pSyncNode->pVotesRespond);
1201
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1202
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1203
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1204
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1205
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1206
  logStoreDestory(pSyncNode->pLogStore);
1207
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1208 1209
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1210

C
cadem 已提交
1211
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1212 1213
    if (pSyncNode->senders[i] != NULL) {
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
1214

1215 1216
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
        snapshotSenderStop(pSyncNode->senders[i], false);
1217 1218
      }

1219 1220
      snapshotSenderDestroy(pSyncNode->senders[i]);
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1221 1222 1223
    }
  }

M
Minghao Li 已提交
1224
  if (pSyncNode->pNewNodeReceiver != NULL) {
1225
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1226
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1227 1228
    }

1229
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1230 1231 1232 1233
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1234 1235 1236 1237
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

1238 1239
  raftStoreClose(pSyncNode);

1240
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1241 1242
}

1243
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
M
Minghao Li 已提交
1244

M
Minghao Li 已提交
1245 1246 1247
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1248 1249
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1250 1251 1252
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1253
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1254
  }
M
Minghao Li 已提交
1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1268
  if (syncIsInit()) {
1269
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1270

1271 1272 1273 1274 1275
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1276

M
Minghao Li 已提交
1277
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1278
                 &pSyncNode->pElectTimer);
1279

1280
  } else {
M
Minghao Li 已提交
1281
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1282
  }
M
Minghao Li 已提交
1283 1284 1285 1286 1287
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1288
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1289 1290
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1291

M
Minghao Li 已提交
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

1302
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1303 1304
  int32_t electMS;

1305
  if (pSyncNode->raftCfg.isStandBy) {
M
Minghao Li 已提交
1306 1307 1308 1309
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
1310 1311

  (void)syncNodeRestartElectTimer(pSyncNode, electMS);
1312

S
Shengliang Guan 已提交
1313 1314
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1315 1316
}

M
Minghao Li 已提交
1317
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1318
  int32_t ret = 0;
S
Shengliang Guan 已提交
1319 1320
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1321 1322 1323
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1324
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1325
  }
1326

S
Shengliang Guan 已提交
1327
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1328 1329 1330
  return ret;
}

M
Minghao Li 已提交
1331
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1332
  int32_t ret = 0;
M
Minghao Li 已提交
1333

1334
#if 0
M
Minghao Li 已提交
1335
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1336 1337
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1338

S
Shengliang Guan 已提交
1339
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1340
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1341 1342 1343
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1344
  }
1345

M
Minghao Li 已提交
1346 1347 1348
  return ret;
}

M
Minghao Li 已提交
1349 1350
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1351 1352

#if 0
M
Minghao Li 已提交
1353 1354 1355
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1356
#endif
1357

S
Shengliang Guan 已提交
1358
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1359
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1360 1361 1362
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1363
  }
1364

M
Minghao Li 已提交
1365 1366 1367
  return ret;
}

1368 1369 1370 1371 1372 1373
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

1374 1375 1376 1377 1378 1379 1380 1381
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
  SEpSet* epSet = NULL;
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
    if (destRaftId->addr == pNode->peersId[i].addr) {
      epSet = &pNode->peersEpset[i];
      break;
    }
  }
1382

S
Shengliang Guan 已提交
1383
  int32_t code = -1;
1384
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
M
Minghao Li 已提交
1385
    syncUtilMsgHtoN(pMsg->pCont);
1386
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1387 1388 1389 1390 1391 1392
    code = pNode->syncSendMSg(epSet, pMsg);
  }

  if (code < 0) {
    sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet,
           DID(destRaftId), destRaftId->addr, terrno);
S
Shengliang Guan 已提交
1393
    rpcFreeCont(pMsg->pCont);
1394
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
1395
  }
S
Shengliang Guan 已提交
1396 1397

  return code;
M
Minghao Li 已提交
1398 1399
}

1400
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
1401 1402 1403
  bool b1 = false;
  bool b2 = false;

C
cadem 已提交
1404
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
1405 1406
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
1407 1408 1409 1410 1411
      b1 = true;
      break;
    }
  }

C
cadem 已提交
1412
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
1413 1414 1415 1416
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
        .vgId = pNode->vgId,
    };
1417

1418
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
1419 1420 1421 1422 1423
      b2 = true;
      break;
    }
  }

1424
  ASSERT(b1 == b2);
1425 1426 1427
  return b1;
}

1428
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
C
cadem 已提交
1429
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
1430
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
C
cadem 已提交
1431
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
1432 1433 1434 1435
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
D
dapan1121 已提交
1436
    if (pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
1437 1438 1439 1440 1441
  }

  return false;
}

M
Minghao Li 已提交
1442
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1443
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1444 1445 1446 1447
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1448

1449 1450
  pSyncNode->raftCfg.cfg = *pNewConfig;
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
1451

1452 1453
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1454 1455
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1456

M
Minghao Li 已提交
1457 1458
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1459

M
Minghao Li 已提交
1460 1461 1462 1463
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1464
  }
1465

M
Minghao Li 已提交
1466 1467 1468 1469 1470
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1471

M
Minghao Li 已提交
1472
  // log begin config change
D
dapan1121 已提交
1473 1474 1475
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d",
         pSyncNode->vgId, oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, oldConfig.lastIndex,
         pNewConfig->lastIndex);
M
Minghao Li 已提交
1476

M
Minghao Li 已提交
1477
  if (IamInNew) {
1478
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1479
  }
M
Minghao Li 已提交
1480
  if (isDrop) {
1481
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
M
Minghao Li 已提交
1482 1483
  }

M
Minghao Li 已提交
1484
  // add last config index
1485
  syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
M
Minghao Li 已提交
1486

M
Minghao Li 已提交
1487 1488 1489 1490 1491
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
C
cadem 已提交
1492
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
M
Minghao Li 已提交
1493
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
C
cadem 已提交
1494 1495
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1496
      oldSenders[i] = pSyncNode->senders[i];
S
Shengliang Guan 已提交
1497
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1498
    }
1499

M
Minghao Li 已提交
1500
    // init internal
1501
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
1502
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1503 1504

    // init peersNum, peers, peersId
C
cadem 已提交
1505
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
S
Shengliang Guan 已提交
1506
    int32_t j = 0;
C
cadem 已提交
1507
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1508 1509 1510
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
1511 1512 1513
        j++;
      }
    }
S
Shengliang Guan 已提交
1514
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1515
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1516
    }
1517

M
Minghao Li 已提交
1518
    // init replicaNum, replicasId
1519
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
C
cadem 已提交
1520 1521
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1522
      syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1523
    }
1524

1525
    // update quorum first
1526
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
1527

M
Minghao Li 已提交
1528 1529 1530 1531
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1532

M
Minghao Li 已提交
1533
    // reset snapshot senders
1534

M
Minghao Li 已提交
1535
    // clear new
C
cadem 已提交
1536
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1537
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1538
    }
M
Minghao Li 已提交
1539

M
Minghao Li 已提交
1540
    // reset new
C
cadem 已提交
1541
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
M
Minghao Li 已提交
1542 1543
      // reset sender
      bool reset = false;
C
cadem 已提交
1544
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
M
Minghao Li 已提交
1545
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
1546 1547
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
M
Minghao Li 已提交
1548

1549
          pSyncNode->senders[i] = oldSenders[j];
M
Minghao Li 已提交
1550 1551 1552 1553
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
1554 1555
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
          pSyncNode->senders[i]->replicaIndex = i;
M
Minghao Li 已提交
1556

1557 1558
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
M
Minghao Li 已提交
1559 1560

          break;
M
Minghao Li 已提交
1561
        }
1562 1563
      }
    }
1564

M
Minghao Li 已提交
1565
    // create new
C
cadem 已提交
1566
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1567 1568 1569 1570 1571 1572 1573 1574
      if (pSyncNode->senders[i] == NULL) {
        pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
        if (pSyncNode->senders[i] == NULL) {
          // will be created later while send snapshot
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
        } else {
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
        }
S
Shengliang Guan 已提交
1575
      } else {
1576
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
M
Minghao Li 已提交
1577
      }
1578 1579
    }

M
Minghao Li 已提交
1580
    // free old
C
cadem 已提交
1581
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1582
      if (oldSenders[i] != NULL) {
1583
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1584 1585 1586
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1587 1588
    }

1589
    // persist cfg
1590
    syncWriteCfgFile(pSyncNode);
1591

D
dapan1121 已提交
1592
#if 0
M
Minghao Li 已提交
1593 1594
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
1595
      syncNodeBecomeLeader(pSyncNode, "");
1596 1597 1598

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1599
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1600

M
Minghao Li 已提交
1601
    } else {
1602
      syncNodeBecomeFollower(pSyncNode, "");
M
Minghao Li 已提交
1603
    }
D
dapan1121 已提交
1604
#endif
M
Minghao Li 已提交
1605
  } else {
1606
    // persist cfg
1607
    syncWriteCfgFile(pSyncNode);
C
cadem 已提交
1608
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
1609
  }
1610

M
Minghao Li 已提交
1611
_END:
M
Minghao Li 已提交
1612
  // log end config change
C
cadem 已提交
1613
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
M
Minghao Li 已提交
1614 1615
}

M
Minghao Li 已提交
1616 1617
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
1618
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1619
    raftStoreSetTerm(pSyncNode, term);
1620
    char tmpBuf[64];
1621
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1622
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1623
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1624 1625 1626
  }
}

1627
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
1628
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1629
    raftStoreSetTerm(pSyncNode, term);
1630 1631 1632
  }
}

M
Minghao Li 已提交
1633
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
1634 1635 1636
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
  if (currentTerm > newTerm) {
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1637 1638
    return;
  }
M
Minghao Li 已提交
1639 1640

  do {
1641
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1642 1643
  } while (0);

1644
  if (currentTerm < newTerm) {
S
Shengliang Guan 已提交
1645
    raftStoreSetTerm(pSyncNode, newTerm);
M
Minghao Li 已提交
1646
    char tmpBuf[64];
1647
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1648
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1649
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1650 1651 1652 1653 1654 1655 1656 1657

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1658 1659
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1660
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1661
  // maybe clear leader cache
M
Minghao Li 已提交
1662 1663 1664 1665
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1666 1667
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1668
  // state change
M
Minghao Li 已提交
1669 1670 1671
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

1672 1673
  // trace log
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1674

1675 1676 1677
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1678 1679 1680 1681 1682
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1683 1684 1685
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1686 1687 1688
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

1689 1690
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1691 1692
}

C
cadem 已提交
1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
  pSyncNode->hbSlowNum = 0;

  // state change
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;

  // trace log
  sNTrace(pSyncNode, "become learner %s", debugStr);

  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
  }

  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
}

M
Minghao Li 已提交
1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731
// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1732
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1733 1734
  pSyncNode->leaderTime = taosGetTimestampMs();

1735
  pSyncNode->becomeLeaderNum++;
1736
  pSyncNode->hbrSlowNum = 0;
1737

1738 1739 1740
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1741
  // state change
M
Minghao Li 已提交
1742
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1743 1744

  // set leader cache
M
Minghao Li 已提交
1745 1746
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1747
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
1748 1749 1750
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1751
    ASSERT(code == 0);
1752
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1753 1754
  }

S
Shengliang Guan 已提交
1755
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1756 1757
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1758 1759 1760
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1761 1762 1763
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1764
#if 0
1765 1766
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1767
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1768
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1769 1770
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
1771
      }
1772
    }
1773
    (pMySender->privateTerm) += 100;
1774
  }
M
Minghao Li 已提交
1775
#endif
1776

1777
  // close receiver
1778
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1779
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1780 1781
  }

M
Minghao Li 已提交
1782
  // stop elect timer
M
Minghao Li 已提交
1783
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1784

M
Minghao Li 已提交
1785 1786
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1787

M
Minghao Li 已提交
1788 1789
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1790

1791 1792 1793 1794 1795
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1796 1797 1798
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1799 1800 1801
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1802
  // trace log
1803
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1804 1805 1806
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1807
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1808 1809 1810 1811 1812
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
  if (!granted) {
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
    return;
  }
1813
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1814

S
Shengliang Guan 已提交
1815
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1816

B
Benguang Zhao 已提交
1817
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1818 1819 1820 1821
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1822
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1823
  ASSERT(lastIndex >= 0);
1824 1825
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1826 1827
}

M
Minghao Li 已提交
1828 1829
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1830
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
C
cadem 已提交
1831
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1832 1833 1834 1835 1836
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1837 1838 1839
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1840
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1841
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1842
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1843
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1844
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1845

S
Shengliang Guan 已提交
1846
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1847 1848 1849
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1850
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1851
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1852
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1853
  sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1854
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1855

S
Shengliang Guan 已提交
1856
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1857 1858 1859
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1860
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1861
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1862
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1863
  sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1864
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1865

S
Shengliang Guan 已提交
1866
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1867 1868
}

M
Minghao Li 已提交
1869 1870
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1871
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1872
  ASSERT(term == raftStoreGetTerm(pSyncNode));
1873 1874
  bool voted = raftStoreHasVoted(pSyncNode);
  ASSERT(!voted);
M
Minghao Li 已提交
1875

S
Shengliang Guan 已提交
1876
  raftStoreVote(pSyncNode, pRaftId);
M
Minghao Li 已提交
1877 1878
}

M
Minghao Li 已提交
1879
// simulate get vote from outside
1880 1881
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1882

S
Shengliang Guan 已提交
1883 1884
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1885
  if (ret != 0) return;
M
Minghao Li 已提交
1886

S
Shengliang Guan 已提交
1887
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1888 1889
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
1890
  pMsg->term = currentTerm;
M
Minghao Li 已提交
1891 1892 1893 1894
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1895
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1896 1897
}

M
Minghao Li 已提交
1898
// return if has a snapshot
M
Minghao Li 已提交
1899 1900
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1901
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1902 1903
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1904 1905 1906 1907 1908 1909 1910
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1911 1912
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1913
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1914
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1915 1916
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1917 1918 1919 1920 1921 1922 1923
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1924 1925
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1926 1927
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1928 1929
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1930
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1931 1932
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1933 1934
    }

M
Minghao Li 已提交
1935 1936 1937
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1938 1939 1940 1941
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1942
  } else {
M
Minghao Li 已提交
1943 1944
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1945
  }
M
Minghao Li 已提交
1946

M
Minghao Li 已提交
1947 1948 1949 1950 1951 1952 1953
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1954 1955
  return 0;
}
M
Minghao Li 已提交
1956

M
Minghao Li 已提交
1957
// return append-entries first try index
M
Minghao Li 已提交
1958 1959 1960 1961 1962
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1963 1964
// if index > 0, return index - 1
// else, return -1
1965 1966 1967 1968 1969 1970 1971 1972 1973
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1974 1975 1976 1977
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1978 1979 1980 1981 1982 1983 1984 1985 1986
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

1987 1988 1989
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

1990
  SSyncRaftEntry* pPreEntry = NULL;
1991 1992 1993 1994 1995 1996 1997
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

1998
    pSyncNode->pLogStore->cacheHit++;
1999 2000 2001
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
2002
    pSyncNode->pLogStore->cacheMiss++;
2003 2004 2005 2006
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
2007 2008 2009 2010 2011 2012

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2013
  if (code == 0) {
2014
    ASSERT(pPreEntry != NULL);
2015
    preTerm = pPreEntry->term;
2016 2017 2018 2019

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
2020
      syncEntryDestroy(pPreEntry);
2021 2022
    }

2023 2024
    return preTerm;
  } else {
2025 2026 2027 2028
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2029 2030 2031 2032
      }
    }
  }

2033
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
2034
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2035 2036
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2037 2038 2039 2040

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2041
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2042 2043 2044
  return 0;
}

M
Minghao Li 已提交
2045
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2046
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2047

S
Shengliang Guan 已提交
2048 2049 2050
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2051
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
2052 2053
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
2054
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
2055 2056
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
2057
    }
M
Minghao Li 已提交
2058

M
Minghao Li 已提交
2059
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
2060 2061
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
2062
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
2063 2064
      rpcFreeCont(rpcMsg.pCont);
      return;
2065
    }
M
Minghao Li 已提交
2066

S
Shengliang Guan 已提交
2067
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
2068
  }
M
Minghao Li 已提交
2069 2070
}

M
Minghao Li 已提交
2071
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2072
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2073

M
Minghao Li 已提交
2074 2075
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2076

2077
  if (pNode == NULL) return;
M
Minghao Li 已提交
2078 2079 2080 2081 2082

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2083

2084
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2085 2086 2087 2088
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2089

S
Shengliang Guan 已提交
2090
  SRpcMsg rpcMsg = {0};
2091 2092
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2093

S
Shengliang Guan 已提交
2094
  if (code != 0) {
M
Minghao Li 已提交
2095
    sError("failed to build elect msg");
M
Minghao Li 已提交
2096
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2097
    return;
M
Minghao Li 已提交
2098 2099
  }

S
Shengliang Guan 已提交
2100
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2101
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2102 2103 2104

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2105
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2106
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2107
    syncNodeRelease(pNode);
2108
    return;
M
Minghao Li 已提交
2109
  }
M
Minghao Li 已提交
2110 2111

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2112 2113
}

M
Minghao Li 已提交
2114
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2115
  if (!syncIsInit()) return;
2116

S
Shengliang Guan 已提交
2117
  SSyncNode* pNode = param;
C
cadem 已提交
2118
  if (pNode->totalReplicaNum > 1) {
S
Shengliang Guan 已提交
2119 2120
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2121
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2122 2123 2124
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2125
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2126
        return;
2127
      }
M
Minghao Li 已提交
2128

2129
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2130 2131
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2132
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2133 2134
        rpcFreeCont(rpcMsg.pCont);
        return;
2135
      }
S
Shengliang Guan 已提交
2136 2137 2138 2139

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2140
    } else {
S
Shengliang Guan 已提交
2141 2142
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2143
    }
M
Minghao Li 已提交
2144 2145 2146
  }
}

2147
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2148
  int64_t hbDataRid = (int64_t)param;
2149
  int64_t tsNow = taosGetTimestampMs();
2150

2151 2152
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
C
cadem 已提交
2153
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2154 2155
    return;
  }
2156

2157
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2158
  if (pSyncNode == NULL) {
2159
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2160
    sError("hb timer get pSyncNode NULL");
2161 2162 2163 2164 2165 2166 2167 2168
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2169
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2170 2171 2172
    return;
  }

M
Minghao Li 已提交
2173
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2174 2175
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2176
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2177 2178 2179
    return;
  }

C
cadem 已提交
2180
  sTrace("vgId:%d, eq peer hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid, pData->destId.addr);
2181

C
cadem 已提交
2182
  if (pSyncNode->totalReplicaNum > 1) {
M
Minghao Li 已提交
2183 2184 2185
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2186
    if (timerLogicClock == msgLogicClock) {
2187 2188 2189 2190 2191 2192
      if (tsNow > pData->execTime) {
        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

2193 2194
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

2195 2196 2197
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
2198
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
2199
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
2200
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
2201
        pSyncMsg->privateTerm = 0;
2202
        pSyncMsg->timeStamp = tsNow;
2203 2204 2205 2206 2207 2208

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
C
cadem 已提交
2209
        sTrace("vgId:%d, send heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
2210 2211
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2212 2213 2214
      } else {
      }

M
Minghao Li 已提交
2215 2216
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2217 2218
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2219 2220 2221 2222
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2223
    } else {
M
Minghao Li 已提交
2224 2225
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2226 2227
    }
  }
2228 2229 2230

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2231 2232
}

D
dapan1121 已提交
2233 2234 2235 2236
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
  (void)ud;
  taosMemoryFree(value);
}
2237

2238 2239 2240 2241
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2242 2243
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2244
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
D
dapan1121 已提交
2245
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW, NULL);
2246 2247 2248 2249 2250 2251 2252
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2253
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
2254 2255 2256 2257 2258 2259 2260
  if (pEntry->dataLen < sizeof(SMsgHead)) {
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
    syncEntryDestroy(pEntry);
    return -1;
  }

B
Benguang Zhao 已提交
2261 2262
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2263
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
D
dapan1121 已提交
2264
    ASSERT(terrno != 0);
2265
    (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
2266
    syncEntryDestroy(pEntry);
B
Benguang Zhao 已提交
2267 2268 2269 2270
    return -1;
  }

  // proceed match index, with replicating on needed
2271
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2272

S
Shengliang Guan 已提交
2273
  sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2274 2275 2276
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2277

B
Benguang Zhao 已提交
2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2294
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
C
cadem 已提交
2295
  if (pSyncNode->totalReplicaNum == 1) {
2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2307
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2308 2309 2310 2311 2312 2313 2314 2315 2316
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2317 2318 2319
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
C
cadem 已提交
2320
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2336
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2337
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
2338
  SyncTerm  term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2339 2340 2341 2342 2343 2344 2345

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2346 2347
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2348 2349 2350
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2351 2352
  int32_t ret = 0;

2353
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
2354
  SyncTerm        term = raftStoreGetTerm(ths);
M
Minghao Li 已提交
2355
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2356
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2357

2358 2359
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2360
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2361
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
2362
    if (code != 0) {
M
Minghao Li 已提交
2363
      sError("append noop error");
2364 2365
      return -1;
    }
2366 2367

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2368 2369
  }

2370 2371 2372
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2373
    syncEntryDestroy(pEntry);
2374 2375
  }

M
Minghao Li 已提交
2376 2377 2378
  return ret;
}

S
Shengliang Guan 已提交
2379 2380
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2381
  bool           resetElect = false;
2382

M
Minghao Li 已提交
2383 2384 2385 2386
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2387
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2388
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2389
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2390

S
Shengliang Guan 已提交
2391 2392 2393 2394 2395
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
    sWarn(
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
        "cluster:%d",
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
2396 2397 2398
    return 0;
  }

2399 2400
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
2401
  SyncTerm currentTerm = raftStoreGetTerm(ths);
2402 2403

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2404 2405
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
2406
  pMsgReply->term = currentTerm;
2407
  pMsgReply->privateTerm = 8864;  // magic number
2408
  pMsgReply->startTime = ths->startTime;
2409
  pMsgReply->timeStamp = tsMs;
2410

D
dapan1121 已提交
2411 2412
  sTrace("vgId:%d, heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, ths->vgId,
         DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
C
cadem 已提交
2413

D
dapan1121 已提交
2414
  if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
D
dmchen 已提交
2415 2416 2417 2418
    raftStoreSetTerm(ths, pMsg->term);
    currentTerm = pMsg->term;
  }

2419
  if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2420
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
2421
    resetElect = true;
2422

M
Minghao Li 已提交
2423
    ths->minMatchIndex = pMsg->minMatchIndex;
2424

C
cadem 已提交
2425
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
S
Shengliang Guan 已提交
2426 2427 2428 2429
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2430 2431
      pSyncMsg->cmd =
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
2432 2433
      pSyncMsg->commitIndex = pMsg->commitIndex;
      pSyncMsg->currentTerm = pMsg->term;
2434 2435 2436 2437

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
2438
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
2439 2440
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2441 2442
          sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
                 pMsg->commitIndex, pMsg->term);
2443 2444
        }
      }
2445 2446 2447
    }
  }

C
cadem 已提交
2448
  if (pMsg->term >= currentTerm && ths->state == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
2449 2450 2451 2452
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2453
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
2454 2455
    pSyncMsg->currentTerm = pMsg->term;
    pSyncMsg->commitIndex = pMsg->commitIndex;
2456

S
Shengliang Guan 已提交
2457 2458
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2459 2460 2461 2462
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2463
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
2464
      }
2465 2466 2467 2468 2469
    }
  }

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
2470 2471

  if (resetElect) syncNodeResetElectTimer(ths);
2472 2473 2474
  return 0;
}

2475
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2476 2477 2478 2479
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2480
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2481
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2482 2483 2484 2485
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2486 2487

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2488
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2489

2490 2491
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2492
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
2493 2494
}

2495
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2496
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2497

M
Minghao Li 已提交
2498 2499 2500 2501
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2502
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2503
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2504
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2505

2506
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2507
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2508 2509 2510
  return 0;
}

S
Shengliang Guan 已提交
2511 2512
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2513 2514
  syncLogRecvLocalCmd(ths, pMsg, "");

2515
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
2516
    syncNodeStepDown(ths, pMsg->currentTerm);
2517

2518
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
2519 2520 2521 2522
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
      return 0;
    }
2523 2524 2525 2526
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
    if (pMsg->currentTerm == matchTerm) {
      (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
    }
2527
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
S
Shengliang Guan 已提交
2528
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
2529 2530
             ths->commitIndex);
    }
2531
  } else {
2532 2533 2534 2535 2536 2537
    sError("error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
2538 2539 2540 2541 2542 2543 2544 2545 2546 2547
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2548

2549
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2550
  sNTrace(ths, "on client request");
2551

B
Benguang Zhao 已提交
2552 2553
  int32_t code = 0;

B
Benguang Zhao 已提交
2554
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
2555
  SyncTerm        term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2556
  SSyncRaftEntry* pEntry = NULL;
2557 2558 2559 2560
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2561 2562
  }

2563 2564 2565 2566 2567
  if (pEntry == NULL) {
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
    return -1;
  }

B
Benguang Zhao 已提交
2568 2569 2570 2571 2572
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2573 2574
    int32_t code = syncNodeAppend(ths, pEntry);
    return code;
2575 2576 2577
  } else {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
2578
    return -1;
B
Benguang Zhao 已提交
2579 2580 2581
  }
}

S
Shengliang Guan 已提交
2582 2583 2584
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2585
      return "follower";
S
Shengliang Guan 已提交
2586
    case TAOS_SYNC_STATE_CANDIDATE:
2587
      return "candidate";
S
Shengliang Guan 已提交
2588
    case TAOS_SYNC_STATE_LEADER:
2589
      return "leader";
S
Shengliang Guan 已提交
2590
    case TAOS_SYNC_STATE_ERROR:
2591
      return "error";
S
Shengliang Guan 已提交
2592 2593
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
C
cadem 已提交
2594 2595
    case TAOS_SYNC_STATE_LEARNER:
      return "learner";
S
Shengliang Guan 已提交
2596 2597
    default:
      return "unknown";
S
Shengliang Guan 已提交
2598
  }
M
Minghao Li 已提交
2599
}
2600

2601
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
C
cadem 已提交
2602
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2603 2604 2605 2606
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
        .vgId = ths->vgId,
    };
2607 2608 2609 2610 2611 2612 2613 2614 2615 2616

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2617 2618 2619 2620
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

2621
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
C
cadem 已提交
2622
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2623 2624 2625 2626 2627
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2628 2629 2630 2631
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
C
cadem 已提交
2632
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
M
Minghao Li 已提交
2633 2634 2635 2636 2637
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2638
}
M
Minghao Li 已提交
2639

2640 2641
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
C
cadem 已提交
2642
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2643 2644 2645 2646 2647 2648 2649
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2650 2651
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
C
cadem 已提交
2652
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
M
Minghao Li 已提交
2653 2654 2655 2656 2657 2658 2659 2660 2661
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2662
  if (pState == NULL) {
2663
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2664 2665
    return false;
  }
M
Minghao Li 已提交
2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2691
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2692
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2693
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2694 2695 2696 2697 2698 2699
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2700
}