syncMain.c 87.9 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
C
cadem 已提交
40
#include "syncUtil.h"
M
Minghao Li 已提交
41

M
Minghao Li 已提交
42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
48 49 50
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58 59 60 61
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
64
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
65
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
66
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
67 68
    return -1;
  }
M
Minghao Li 已提交
69

S
Shengliang Guan 已提交
70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
71
  if (pSyncNode->rid < 0) {
72
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
73 74 75
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79 80 81
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
82
  return pSyncNode->rid;
M
Minghao Li 已提交
83
}
M
Minghao Li 已提交
84

B
Benguang Zhao 已提交
85
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
86
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
87
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
88
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
B
Benguang Zhao 已提交
89 90 91 92
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
93
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
94
    goto _err;
M
Minghao Li 已提交
95
  }
M
Minghao Li 已提交
96

B
Benguang Zhao 已提交
97 98 99 100
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
101

B
Benguang Zhao 已提交
102 103
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
104

105 106 107
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
108 109
}

M
Minghao Li 已提交
110
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
111
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
112
  if (pSyncNode != NULL) {
113
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
114
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
115
    syncNodeRemove(rid);
M
Minghao Li 已提交
116 117 118
  }
}

M
Minghao Li 已提交
119 120
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
121 122 123
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
124 125 126
  }
}

127 128 129 130 131 132 133 134
void syncPostStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodePostClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
}

S
Shengliang Guan 已提交
135 136 137
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
138 139
}

S
Shengliang Guan 已提交
140
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
141
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
142
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
143

C
cadem 已提交
144 145 146 147 148 149 150
  if(pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex){
    syncNodeRelease(pSyncNode);
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
                                        pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
    return 0;
  }

M
Minghao Li 已提交
151
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
152
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
153
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
154
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
155
    return -1;
M
Minghao Li 已提交
156
  }
157

S
Shengliang Guan 已提交
158
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
C
cadem 已提交
159
  syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex);
S
Shengliang Guan 已提交
160

M
Minghao Li 已提交
161 162 163
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

C
cadem 已提交
164
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
S
Shengliang Guan 已提交
165
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
166 167 168
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
169
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
170
  }
S
Shengliang Guan 已提交
171

S
Shengliang Guan 已提交
172
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
173
  return 0;
M
Minghao Li 已提交
174
}
M
Minghao Li 已提交
175

S
Shengliang Guan 已提交
176 177 178 179
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
180
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
181 182
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
183 184 185 186 187 188 189 190 191 192
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
193 194 195
    case TDMT_SYNC_TIMEOUT_ELECTION:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
215
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
216 217 218 219
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
C
cadem 已提交
220 221 222
    case TDMT_SYNC_FORCE_FOLLOWER:
      code = syncForceBecomeFollower(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
223
    default:
224
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
225
      code = -1;
M
Minghao Li 已提交
226 227
  }

S
Shengliang Guan 已提交
228
  syncNodeRelease(pSyncNode);
229 230 231 232
  if (code != 0) {
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
           terrno);
  }
S
Shengliang Guan 已提交
233
  return code;
234 235
}

S
Shengliang Guan 已提交
236
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
237
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
238
  if (pSyncNode == NULL) return -1;
239

S
Shengliang Guan 已提交
240
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
241
  syncNodeRelease(pSyncNode);
242 243 244
  return ret;
}

C
cadem 已提交
245 246
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  syncNodeBecomeFollower(ths, "force election");
C
cadem 已提交
247

C
cadem 已提交
248 249 250 251 252 253 254 255 256
  SRpcMsg rsp = {
      .code = 0,
      .pCont = pRpcMsg->info.rsp,
      .contLen = pRpcMsg->info.rspLen,
      .info = pRpcMsg->info,
  };
  tmsgSendRsp(&rsp);

  return 0;
C
cadem 已提交
257 258
}

259
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
260
  SSyncNode* pNode = syncNodeAcquire(rid);
261
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
262 263

  SRpcMsg rpcMsg = {0};
264
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
265 266 267
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
268
  if (ret == 1) {
269
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
270
    rpcSendResponse(&rpcMsg);
271 272
    return 0;
  } else {
273
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
274
    return -1;
275
  }
S
Shengliang Guan 已提交
276 277
}

M
Minghao Li 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

294
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
295
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
296
  if (pSyncNode == NULL) {
297
    sError("sync begin snapshot error");
298 299
    return -1;
  }
300

301 302 303 304 305 306 307 308 309 310
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
    syncNodeRelease(pSyncNode);
    return 0;
  }

311
  int32_t code = 0;
312
  int64_t logRetention = 0;
313

M
Minghao Li 已提交
314
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
315
    // mnode
316
    logRetention = tsMndLogRetention;
M
Minghao Li 已提交
317 318 319 320
  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas
321 322 323
      logRetention = SYNC_VNODE_LOG_RETENTION;
    }
  }
M
Minghao Li 已提交
324

C
cadem 已提交
325 326 327
  if (pSyncNode->totalReplicaNum > 1) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER 
        && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
328 329 330 331
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
              lastApplyIndex);
      syncNodeRelease(pSyncNode);
      return 0;
332
    }
333
    logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
334 335
  }

M
Minghao Li 已提交
336
_DEL_WAL:
337

M
Minghao Li 已提交
338
  do {
339 340 341 342 343 344 345 346 347 348 349
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

350
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
351 352 353 354 355 356 357 358
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
359

M
Minghao Li 已提交
360
      } else {
361 362
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
363
      }
364
    }
M
Minghao Li 已提交
365
  } while (0);
366

S
Shengliang Guan 已提交
367
  syncNodeRelease(pSyncNode);
368 369 370 371
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
372
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
373
  if (pSyncNode == NULL) {
374
    sError("sync end snapshot error");
375 376 377
    return -1;
  }

378 379 380 381
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
382
    if (code != 0) {
383
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
384
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
385 386
      return -1;
    } else {
S
Shengliang Guan 已提交
387
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
388 389
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
390
  }
391

S
Shengliang Guan 已提交
392
  syncNodeRelease(pSyncNode);
393 394 395
  return code;
}

M
Minghao Li 已提交
396
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
397
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
398
  if (pSyncNode == NULL) {
399
    sError("sync step down error");
M
Minghao Li 已提交
400 401 402
    return -1;
  }

M
Minghao Li 已提交
403
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
404
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
405
  return 0;
M
Minghao Li 已提交
406 407
}

408
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
409
  if (pSyncNode == NULL) {
410
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
411
    sError("sync ready for read error");
412 413
    return false;
  }
M
Minghao Li 已提交
414

415 416 417 418 419
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

420
  if (!pSyncNode->restoreFinish) {
421
    terrno = TSDB_CODE_SYN_RESTORING;
422
    return false;
423
  }
424

425
  return true;
426 427 428 429 430 431 432 433 434 435 436
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

437 438
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
439
}
M
Minghao Li 已提交
440

441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
463 464
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
465
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
466 467
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
468
  }
M
Minghao Li 已提交
469

470
  int32_t ret = 0;
471
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
472
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
473 474 475 476 477 478 479
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
480 481 482
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
483
  return ret;
M
Minghao Li 已提交
484 485
}

M
Minghao Li 已提交
486 487
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
488
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
489 490
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
491
  }
492

S
Shengliang Guan 已提交
493
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
494

495 496 497 498
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
499
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
M
Minghao Li 已提交
500 501 502
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
503
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
504 505
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
506 507
}

508 509
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
510

S
Shengliang Guan 已提交
511
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
512 513 514
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
515 516 517 518 519
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
520
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
521 522
  }

523
  return state;
M
Minghao Li 已提交
524 525
}

526
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
527 528
  ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
529

530 531 532 533
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
534 535
    }
  }
S
Shengliang Guan 已提交
536
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
537
         snapshotLastApplyIndex, lastIndex);
538 539 540 541

  return lastIndex;
}

542 543
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
544

S
Shengliang Guan 已提交
545
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
546
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
547

C
cadem 已提交
548 549
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
    if(pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
S
Shengliang Guan 已提交
550
    SEp* pEp = &pEpSet->eps[i];
551 552
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
S
Shengliang Guan 已提交
553
    pEpSet->numOfEps++;
554
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
555
  }
M
Minghao Li 已提交
556
  if (pEpSet->numOfEps > 0) {
557
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
558 559
  }

S
Shengliang Guan 已提交
560
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
561
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
562 563
}

S
Shengliang Guan 已提交
564
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
565
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
566
  if (pSyncNode == NULL) {
567
    sError("sync propose error");
M
Minghao Li 已提交
568
    return -1;
569
  }
570

S
Shengliang Guan 已提交
571
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
572
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
573 574
  return ret;
}
M
Minghao Li 已提交
575

C
cadem 已提交
576 577 578 579 580 581 582
int32_t syncIsCatchUp(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync Node Acquire error since %d", errno);
    return -1;
  }

C
cadem 已提交
583 584 585 586 587 588 589 590 591 592 593 594 595 596
  int32_t isCatchUp = 0;
  if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
      pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
      pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){
    sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, 
                                  pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, 
                                  pSyncNode->pLogBuf->matchIndex);
    isCatchUp = 0;
  }
  else{
    sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, 
                                  pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
                                  pSyncNode->pLogBuf->matchIndex);
    isCatchUp = 1;
C
cadem 已提交
597 598 599
  }
  
  syncNodeRelease(pSyncNode);
C
cadem 已提交
600 601 602 603 604 605 606 607 608 609 610 611 612 613
  return isCatchUp;
}

ESyncRole syncGetRole(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync Node Acquire error since %d", errno);
    return -1;
  }

  ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
  
  syncNodeRelease(pSyncNode);
  return role;
C
cadem 已提交
614 615
}

S
Shengliang Guan 已提交
616
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
617 618
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
619
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
620 621
    return -1;
  }
622

S
Shengliang Guan 已提交
623 624 625 626 627 628 629
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
630

631
  // heartbeat timeout
632
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
633 634 635 636 637 638
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
639 640 641
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
642
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
643 644
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
645
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
646 647 648
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
649
    } else {
S
Shengliang Guan 已提交
650
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
651
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
652
             TMSG_INFO(pMsg->msgType));
653
      return -1;
654
    }
S
Shengliang Guan 已提交
655
  } else {
S
Shengliang Guan 已提交
656 657
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
658
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
659
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
660 661 662 663
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
664
    }
665

666 667 668
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
669
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
670
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
671
    }
M
Minghao Li 已提交
672

S
Shengliang Guan 已提交
673
    if (seq != NULL) *seq = seqNum;
674
    return code;
M
Minghao Li 已提交
675
  }
M
Minghao Li 已提交
676 677
}

S
Shengliang Guan 已提交
678
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
679 680 681 682 683
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
684
  pSyncTimer->timeStamp = taosGetTimestampMs();
685 686 687 688
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
689
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
690
  int32_t ret = 0;
S
Shengliang Guan 已提交
691
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
692
  if (syncIsInit()) {
693 694 695 696 697 698
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
699
    pSyncTimer->timeStamp = tsNow;
700 701

    pData->syncNodeRid = pSyncNode->rid;
702 703 704
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
705
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
706

C
cadem 已提交
707 708
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr);

709 710
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
711 712 713 714 715 716
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
717
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
718 719 720 721
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
722 723
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
724 725 726
  return ret;
}

727
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
728 729 730
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
731 732 733
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

734 735 736 737 738
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
S
Shengliang Guan 已提交
739
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
740 741 742 743 744 745 746
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
747
// open/close --------------
S
Shengliang Guan 已提交
748 749
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
750 751 752 753
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
754

M
Minghao Li 已提交
755 756 757 758
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
759
      goto _error;
M
Minghao Li 已提交
760
    }
761
  }
M
Minghao Li 已提交
762

763 764 765
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
S
Shengliang Guan 已提交
766
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
767

768
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
769
    // create a new raft config file
770
    sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
771 772
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
C
cadem 已提交
773
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
774 775 776 777 778 779 780
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
    pSyncNode->raftCfg.configIndexCount = 1;
    pSyncNode->raftCfg.configIndexArr[0] = -1;

    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
781
      goto _error;
782
    }
783 784
  } else {
    // update syncCfg by raft_config.json
785 786
    if (syncReadCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
787
      goto _error;
788
    }
S
Shengliang Guan 已提交
789

C
cadem 已提交
790
    if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
791
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
792 793 794
      pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
      if (syncWriteCfgFile(pSyncNode) != 0) {
        sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
S
Shengliang Guan 已提交
795 796
        goto _error;
      }
S
Shengliang Guan 已提交
797
    } else {
798 799
      sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
800
    }
M
Minghao Li 已提交
801 802
  }

M
Minghao Li 已提交
803
  // init by SSyncInfo
M
Minghao Li 已提交
804
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
805
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
806
  bool      updated = false;
C
cadem 已提交
807 808 809
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", 
                          pSyncNode->vgId, pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
S
Shengliang Guan 已提交
810
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
811 812 813
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
      updated = true;
    }
814 815
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
816 817
  }

818 819 820 821 822 823 824 825
  if (updated) {
    sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
      goto _error;
    }
  }

M
Minghao Li 已提交
826
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
827
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
828 829 830
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
831

B
Benguang Zhao 已提交
832 833 834
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
835
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
836 837 838
    goto _error;
  }

M
Minghao Li 已提交
839
  // init internal
840
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
841
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
842
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
843
    goto _error;
844
  }
M
Minghao Li 已提交
845

M
Minghao Li 已提交
846
  // init peersNum, peers, peersId
C
cadem 已提交
847
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
S
Shengliang Guan 已提交
848
  int32_t j = 0;
C
cadem 已提交
849
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
850 851 852
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
853 854 855
      j++;
    }
  }
S
Shengliang Guan 已提交
856
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
857
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
858
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
859
      goto _error;
860
    }
M
Minghao Li 已提交
861
  }
M
Minghao Li 已提交
862

M
Minghao Li 已提交
863
  // init replicaNum, replicasId
864
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
C
cadem 已提交
865 866
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
867
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
868
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
869
      goto _error;
870
    }
M
Minghao Li 已提交
871 872
  }

M
Minghao Li 已提交
873
  // init raft algorithm
M
Minghao Li 已提交
874
  pSyncNode->pFsm = pSyncInfo->pFsm;
875
  pSyncInfo->pFsm = NULL;
876
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
M
Minghao Li 已提交
877 878
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
879
  // init life cycle outside
M
Minghao Li 已提交
880

M
Minghao Li 已提交
881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
905
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
906
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
907
  if (raftStoreOpen(pSyncNode) != 0) {
S
Shengliang Guan 已提交
908
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
909 910
    goto _error;
  }
M
Minghao Li 已提交
911

M
Minghao Li 已提交
912
  // init TLA+ candidate vars
M
Minghao Li 已提交
913
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
914
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
915
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
916 917
    goto _error;
  }
M
Minghao Li 已提交
918
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
919
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
920
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
921 922
    goto _error;
  }
M
Minghao Li 已提交
923

M
Minghao Li 已提交
924 925
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
926
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
927
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
928 929
    goto _error;
  }
M
Minghao Li 已提交
930
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
931
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
932
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
933 934
    goto _error;
  }
M
Minghao Li 已提交
935 936 937

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
938
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
939
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
940 941
    goto _error;
  }
942 943 944 945

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
946
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
947 948
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
949
      sNTrace(pSyncNode, "reset commit index by snapshot");
950 951 952
    }
  }
  pSyncNode->commitIndex = commitIndex;
953
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
M
Minghao Li 已提交
954

955
  // restore log store on need
956
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
957
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
958 959
    goto _error;
  }
960

M
Minghao Li 已提交
961 962
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
963 964
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
965

M
Minghao Li 已提交
966
  // init ping timer
M
Minghao Li 已提交
967
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
968
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
969 970
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
971
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
972
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
973

M
Minghao Li 已提交
974 975
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
976
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
977
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
978
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
979 980 981 982
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
983
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
984 985
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
986
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
987 988
  pSyncNode->heartbeatTimerCounter = 0;

989
  // init peer heartbeat timer
C
cadem 已提交
990
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
991 992 993
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
994
  // tools
M
Minghao Li 已提交
995
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
996
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
997
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
998 999
    goto _error;
  }
M
Minghao Li 已提交
1000

1001 1002
  // restore state
  pSyncNode->restoreFinish = false;
1003

M
Minghao Li 已提交
1004
  // snapshot senders
C
cadem 已提交
1005
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1006
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
1007 1008 1009 1010
    if (pSender == NULL) return NULL;

    pSyncNode->senders[i] = pSender;
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
M
Minghao Li 已提交
1011 1012 1013
  }

  // snapshot receivers
1014
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
1015 1016 1017
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
          pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1018

M
Minghao Li 已提交
1019 1020 1021
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
1022
  // replication mgr
1023
  if (syncNodeLogReplInit(pSyncNode) < 0) {
1024 1025 1026
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
B
Benguang Zhao 已提交
1027

M
Minghao Li 已提交
1028
  // peer state
1029 1030 1031 1032
  if (syncNodePeerStateInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
M
Minghao Li 已提交
1033

B
Benguang Zhao 已提交
1034
  //
M
Minghao Li 已提交
1035 1036 1037
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1038
  // start in syncNodeStart
M
Minghao Li 已提交
1039
  // start raft
M
Minghao Li 已提交
1040
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1041

M
Minghao Li 已提交
1042 1043
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1044
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1045 1046
  pSyncNode->lastReplicateTime = timeNow;

1047 1048 1049
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
1050 1051
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
1052
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1053
    goto _error;
B
Benguang Zhao 已提交
1054 1055
  }

1056
  pSyncNode->isStart = true;
1057 1058 1059
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1060 1061
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1062
  pSyncNode->tmrRoutineNum = 0;
1063

1064 1065
  sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
M
Minghao Li 已提交
1066
  return pSyncNode;
1067 1068 1069

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1070 1071
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1072 1073 1074 1075
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1076 1077
}

M
Minghao Li 已提交
1078 1079
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1080 1081
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1082 1083 1084 1085 1086 1087
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1088
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1089 1090
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1091 1092 1093 1094

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1095 1096
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
S
Shengliang Guan 已提交
1097
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
1098 1099 1100
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1101

1102
  ASSERT(endIndex == lastVer + 1);
1103 1104
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
B
Benguang Zhao 已提交
1105

1106
  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
B
Benguang Zhao 已提交
1107 1108 1109 1110 1111 1112 1113 1114
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
C
cadem 已提交
1115 1116 1117 1118 1119 1120 1121
  if(pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER){
    syncNodeBecomeLearner(pSyncNode, "first start");
  }
  else{
    if (pSyncNode->replicaNum == 1) {
      raftStoreNextTerm(pSyncNode);
      syncNodeBecomeLeader(pSyncNode, "one replica start");
B
Benguang Zhao 已提交
1122

C
cadem 已提交
1123 1124 1125 1126 1127
      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
    } else {
      syncNodeBecomeFollower(pSyncNode, "first start");
    }    
B
Benguang Zhao 已提交
1128 1129 1130 1131
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1132 1133
  if (ret != 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1134
  }
1135
  return ret;
M
Minghao Li 已提交
1136 1137
}

B
Benguang Zhao 已提交
1138
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1139 1140 1141 1142 1143 1144 1145
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1146 1147 1148 1149
  if (ret < 0) {
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
1150

1151
  ret = syncNodeStartPingTimer(pSyncNode);
1152 1153 1154 1155
  if (ret < 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
B
Benguang Zhao 已提交
1156
  return ret;
M
Minghao Li 已提交
1157 1158
}

M
Minghao Li 已提交
1159
void syncNodePreClose(SSyncNode* pSyncNode) {
1160 1161 1162 1163
  ASSERT(pSyncNode != NULL);
  ASSERT(pSyncNode->pFsm != NULL);
  ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);

M
Minghao Li 已提交
1164 1165 1166 1167 1168
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
1169

1170 1171 1172
  // stop ping timer
  syncNodeStopPingTimer(pSyncNode);

1173 1174
  // clean rsp
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
1175 1176
}

1177 1178 1179
void syncNodePostClose(SSyncNode* pSyncNode) {
  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1180
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1181 1182 1183 1184 1185 1186 1187
    }

    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }
M
Minghao Li 已提交
1188 1189
}

1190
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1191

M
Minghao Li 已提交
1192
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1193
  if (pSyncNode == NULL) return;
1194
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1195

1196 1197
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);

1198 1199 1200
  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);
1201
  syncNodeLogReplDestroy(pSyncNode);
1202

M
Minghao Li 已提交
1203
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1204
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1205
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1206
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1207
  votesRespondDestory(pSyncNode->pVotesRespond);
1208
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1209
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1210
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1211
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1212
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1213
  logStoreDestory(pSyncNode->pLogStore);
1214
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1215 1216
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1217

C
cadem 已提交
1218
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1219 1220
    if (pSyncNode->senders[i] != NULL) {
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
1221

1222 1223
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
        snapshotSenderStop(pSyncNode->senders[i], false);
1224 1225
      }

1226 1227
      snapshotSenderDestroy(pSyncNode->senders[i]);
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1228 1229 1230
    }
  }

M
Minghao Li 已提交
1231
  if (pSyncNode->pNewNodeReceiver != NULL) {
1232
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1233
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1234 1235
    }

1236
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1237 1238 1239 1240
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1241 1242 1243 1244
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

1245 1246
  raftStoreClose(pSyncNode);

1247
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1248 1249
}

1250
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
M
Minghao Li 已提交
1251

M
Minghao Li 已提交
1252 1253 1254
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1255 1256
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1257 1258 1259
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1260
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1261
  }
M
Minghao Li 已提交
1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1275
  if (syncIsInit()) {
1276
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1277

1278 1279 1280 1281 1282
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1283

M
Minghao Li 已提交
1284
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1285
                 &pSyncNode->pElectTimer);
1286

1287
  } else {
M
Minghao Li 已提交
1288
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1289
  }
M
Minghao Li 已提交
1290 1291 1292 1293 1294
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1295
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1296 1297
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1298

M
Minghao Li 已提交
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

1309
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1310 1311
  int32_t electMS;

1312
  if (pSyncNode->raftCfg.isStandBy) {
M
Minghao Li 已提交
1313 1314 1315 1316
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
1317 1318

  (void)syncNodeRestartElectTimer(pSyncNode, electMS);
1319

S
Shengliang Guan 已提交
1320 1321
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1322 1323
}

M
Minghao Li 已提交
1324
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1325
  int32_t ret = 0;
S
Shengliang Guan 已提交
1326 1327
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1328 1329 1330
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1331
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1332
  }
1333

S
Shengliang Guan 已提交
1334
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1335 1336 1337
  return ret;
}

M
Minghao Li 已提交
1338
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1339
  int32_t ret = 0;
M
Minghao Li 已提交
1340

1341
#if 0
M
Minghao Li 已提交
1342
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1343 1344
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1345

S
Shengliang Guan 已提交
1346
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1347
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1348 1349 1350
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1351
  }
1352

M
Minghao Li 已提交
1353 1354 1355
  return ret;
}

M
Minghao Li 已提交
1356 1357
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1358 1359

#if 0
M
Minghao Li 已提交
1360 1361 1362
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1363
#endif
1364

S
Shengliang Guan 已提交
1365
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1366
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1367 1368 1369
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1370
  }
1371

M
Minghao Li 已提交
1372 1373 1374
  return ret;
}

1375 1376 1377 1378 1379 1380
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

1381 1382 1383 1384 1385 1386 1387 1388
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
  SEpSet* epSet = NULL;
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
    if (destRaftId->addr == pNode->peersId[i].addr) {
      epSet = &pNode->peersEpset[i];
      break;
    }
  }
1389

S
Shengliang Guan 已提交
1390
  int32_t code = -1;
1391
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
M
Minghao Li 已提交
1392
    syncUtilMsgHtoN(pMsg->pCont);
1393
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1394 1395 1396 1397 1398 1399
    code = pNode->syncSendMSg(epSet, pMsg);
  }

  if (code < 0) {
    sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet,
           DID(destRaftId), destRaftId->addr, terrno);
S
Shengliang Guan 已提交
1400
    rpcFreeCont(pMsg->pCont);
1401
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
1402
  }
S
Shengliang Guan 已提交
1403 1404

  return code;
M
Minghao Li 已提交
1405 1406
}

1407
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
1408 1409 1410
  bool b1 = false;
  bool b2 = false;

C
cadem 已提交
1411
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
1412 1413
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
1414 1415 1416 1417 1418
      b1 = true;
      break;
    }
  }

C
cadem 已提交
1419
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
1420 1421 1422 1423
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
        .vgId = pNode->vgId,
    };
1424

1425
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
1426 1427 1428 1429 1430
      b2 = true;
      break;
    }
  }

1431
  ASSERT(b1 == b2);
1432 1433 1434
  return b1;
}

1435
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
C
cadem 已提交
1436
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
1437
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
C
cadem 已提交
1438
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
1439 1440 1441 1442
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
C
cadem 已提交
1443
    if(pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
1444 1445 1446 1447 1448
  }

  return false;
}

M
Minghao Li 已提交
1449
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1450
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1451 1452 1453 1454
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1455

1456 1457
  pSyncNode->raftCfg.cfg = *pNewConfig;
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
1458

1459 1460
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1461 1462
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1463

M
Minghao Li 已提交
1464 1465
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1466

M
Minghao Li 已提交
1467 1468 1469 1470
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1471
  }
1472

M
Minghao Li 已提交
1473 1474 1475 1476 1477
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1478

M
Minghao Li 已提交
1479
  // log begin config change
C
cadem 已提交
1480 1481 1482 1483
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d", 
                                          pSyncNode->vgId, 
                                          oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, 
                                          oldConfig.lastIndex, pNewConfig->lastIndex);
M
Minghao Li 已提交
1484

M
Minghao Li 已提交
1485
  if (IamInNew) {
1486
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1487
  }
M
Minghao Li 已提交
1488
  if (isDrop) {
1489
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
M
Minghao Li 已提交
1490 1491
  }

M
Minghao Li 已提交
1492
  // add last config index
1493
  syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
M
Minghao Li 已提交
1494

M
Minghao Li 已提交
1495 1496 1497 1498 1499
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
C
cadem 已提交
1500
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
M
Minghao Li 已提交
1501
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
C
cadem 已提交
1502 1503
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1504
      oldSenders[i] = pSyncNode->senders[i];
S
Shengliang Guan 已提交
1505
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1506
    }
1507

M
Minghao Li 已提交
1508
    // init internal
1509
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
1510
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1511 1512

    // init peersNum, peers, peersId
C
cadem 已提交
1513
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
S
Shengliang Guan 已提交
1514
    int32_t j = 0;
C
cadem 已提交
1515
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1516 1517 1518
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
1519 1520 1521
        j++;
      }
    }
S
Shengliang Guan 已提交
1522
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1523
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1524
    }
1525

M
Minghao Li 已提交
1526
    // init replicaNum, replicasId
1527
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
C
cadem 已提交
1528 1529
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1530
      syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1531
    }
1532

1533
    // update quorum first
1534
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
1535

M
Minghao Li 已提交
1536 1537 1538 1539
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1540

M
Minghao Li 已提交
1541
    // reset snapshot senders
1542

M
Minghao Li 已提交
1543
    // clear new
C
cadem 已提交
1544
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1545
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1546
    }
M
Minghao Li 已提交
1547

M
Minghao Li 已提交
1548
    // reset new
C
cadem 已提交
1549
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
M
Minghao Li 已提交
1550 1551
      // reset sender
      bool reset = false;
C
cadem 已提交
1552
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
M
Minghao Li 已提交
1553
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
1554 1555
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
M
Minghao Li 已提交
1556

1557
          pSyncNode->senders[i] = oldSenders[j];
M
Minghao Li 已提交
1558 1559 1560 1561
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
1562 1563
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
          pSyncNode->senders[i]->replicaIndex = i;
M
Minghao Li 已提交
1564

1565 1566
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
M
Minghao Li 已提交
1567 1568

          break;
M
Minghao Li 已提交
1569
        }
1570 1571
      }
    }
1572

M
Minghao Li 已提交
1573
    // create new
C
cadem 已提交
1574
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1575 1576 1577 1578 1579 1580 1581 1582
      if (pSyncNode->senders[i] == NULL) {
        pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
        if (pSyncNode->senders[i] == NULL) {
          // will be created later while send snapshot
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
        } else {
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
        }
S
Shengliang Guan 已提交
1583
      } else {
1584
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
M
Minghao Li 已提交
1585
      }
1586 1587
    }

M
Minghao Li 已提交
1588
    // free old
C
cadem 已提交
1589
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1590
      if (oldSenders[i] != NULL) {
1591
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1592 1593 1594
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1595 1596
    }

1597
    // persist cfg
1598
    syncWriteCfgFile(pSyncNode);
1599

M
Minghao Li 已提交
1600 1601
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
1602
      syncNodeBecomeLeader(pSyncNode, "");
1603 1604 1605

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1606
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1607

M
Minghao Li 已提交
1608
    } else {
1609
      syncNodeBecomeFollower(pSyncNode, "");
M
Minghao Li 已提交
1610 1611
    }
  } else {
1612
    // persist cfg
1613
    syncWriteCfgFile(pSyncNode);
C
cadem 已提交
1614
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
1615
  }
1616

M
Minghao Li 已提交
1617
_END:
M
Minghao Li 已提交
1618
  // log end config change
C
cadem 已提交
1619
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
M
Minghao Li 已提交
1620 1621
}

M
Minghao Li 已提交
1622 1623
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
1624
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1625
    raftStoreSetTerm(pSyncNode, term);
1626
    char tmpBuf[64];
1627
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1628
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1629
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1630 1631 1632
  }
}

1633
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
1634
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1635
    raftStoreSetTerm(pSyncNode, term);
1636 1637 1638
  }
}

M
Minghao Li 已提交
1639
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
1640 1641 1642
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
  if (currentTerm > newTerm) {
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1643 1644
    return;
  }
M
Minghao Li 已提交
1645 1646

  do {
1647
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1648 1649
  } while (0);

1650
  if (currentTerm < newTerm) {
S
Shengliang Guan 已提交
1651
    raftStoreSetTerm(pSyncNode, newTerm);
M
Minghao Li 已提交
1652
    char tmpBuf[64];
1653
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1654
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1655
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1656 1657 1658 1659 1660 1661 1662 1663

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1664 1665
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1666
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1667
  // maybe clear leader cache
M
Minghao Li 已提交
1668 1669 1670 1671
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1672 1673
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1674
  // state change
M
Minghao Li 已提交
1675 1676 1677
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

1678 1679
  // trace log
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1680

1681 1682 1683
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1684 1685 1686 1687 1688
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1689 1690 1691
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1692 1693 1694
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

1695 1696
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1697 1698
}

C
cadem 已提交
1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
  pSyncNode->hbSlowNum = 0;

  // state change
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;

  // trace log
  sNTrace(pSyncNode, "become learner %s", debugStr);

  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
  }

  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
}

M
Minghao Li 已提交
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737
// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1738
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1739 1740
  pSyncNode->leaderTime = taosGetTimestampMs();

1741
  pSyncNode->becomeLeaderNum++;
1742
  pSyncNode->hbrSlowNum = 0;
1743

1744 1745 1746
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1747
  // state change
M
Minghao Li 已提交
1748
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1749 1750

  // set leader cache
M
Minghao Li 已提交
1751 1752
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1753
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
1754 1755 1756
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1757
    ASSERT(code == 0);
1758
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1759 1760
  }

S
Shengliang Guan 已提交
1761
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1762 1763
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1764 1765 1766
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1767 1768 1769
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1770
#if 0
1771 1772
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1773
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1774
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1775 1776
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
1777
      }
1778
    }
1779
    (pMySender->privateTerm) += 100;
1780
  }
M
Minghao Li 已提交
1781
#endif
1782

1783
  // close receiver
1784
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1785
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1786 1787
  }

M
Minghao Li 已提交
1788
  // stop elect timer
M
Minghao Li 已提交
1789
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1790

M
Minghao Li 已提交
1791 1792
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1793

M
Minghao Li 已提交
1794 1795
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1796

1797 1798 1799 1800 1801
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1802 1803 1804
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1805 1806 1807
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1808
  // trace log
1809
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1810 1811 1812
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1813
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1814 1815 1816 1817 1818
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
  if (!granted) {
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
    return;
  }
1819
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1820

S
Shengliang Guan 已提交
1821
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1822

B
Benguang Zhao 已提交
1823
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1824 1825 1826 1827
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1828
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1829
  ASSERT(lastIndex >= 0);
1830 1831
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1832 1833
}

M
Minghao Li 已提交
1834 1835
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1836
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
C
cadem 已提交
1837
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1838 1839 1840 1841 1842
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1843 1844 1845
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1846
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1847
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1848
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1849
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1850
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1851

S
Shengliang Guan 已提交
1852
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1853 1854 1855
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1856
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1857
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1858
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1859
  sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1860
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1861

S
Shengliang Guan 已提交
1862
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1863 1864 1865
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1866
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1867
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1868
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1869
  sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1870
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1871

S
Shengliang Guan 已提交
1872
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1873 1874
}

M
Minghao Li 已提交
1875 1876
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1877
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1878
  ASSERT(term == raftStoreGetTerm(pSyncNode));
1879 1880
  bool voted = raftStoreHasVoted(pSyncNode);
  ASSERT(!voted);
M
Minghao Li 已提交
1881

S
Shengliang Guan 已提交
1882
  raftStoreVote(pSyncNode, pRaftId);
M
Minghao Li 已提交
1883 1884
}

M
Minghao Li 已提交
1885
// simulate get vote from outside
1886 1887
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1888

S
Shengliang Guan 已提交
1889 1890
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1891
  if (ret != 0) return;
M
Minghao Li 已提交
1892

S
Shengliang Guan 已提交
1893
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1894 1895
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
1896
  pMsg->term = currentTerm;
M
Minghao Li 已提交
1897 1898 1899 1900
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1901
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1902 1903
}

M
Minghao Li 已提交
1904
// return if has a snapshot
M
Minghao Li 已提交
1905 1906
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1907
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1908 1909
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1910 1911 1912 1913 1914 1915 1916
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1917 1918
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1919
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1920
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1921 1922
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1923 1924 1925 1926 1927 1928 1929
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1930 1931
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1932 1933
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1934 1935
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1936
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1937 1938
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1939 1940
    }

M
Minghao Li 已提交
1941 1942 1943
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1944 1945 1946 1947
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1948
  } else {
M
Minghao Li 已提交
1949 1950
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1951
  }
M
Minghao Li 已提交
1952

M
Minghao Li 已提交
1953 1954 1955 1956 1957 1958 1959
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1960 1961
  return 0;
}
M
Minghao Li 已提交
1962

M
Minghao Li 已提交
1963
// return append-entries first try index
M
Minghao Li 已提交
1964 1965 1966 1967 1968
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1969 1970
// if index > 0, return index - 1
// else, return -1
1971 1972 1973 1974 1975 1976 1977 1978 1979
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1980 1981 1982 1983
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1984 1985 1986 1987 1988 1989 1990 1991 1992
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

1993 1994 1995
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

1996
  SSyncRaftEntry* pPreEntry = NULL;
1997 1998 1999 2000 2001 2002 2003
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

2004
    pSyncNode->pLogStore->cacheHit++;
2005 2006 2007
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
2008
    pSyncNode->pLogStore->cacheMiss++;
2009 2010 2011 2012
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
2013 2014 2015 2016 2017 2018

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2019
  if (code == 0) {
2020
    ASSERT(pPreEntry != NULL);
2021
    preTerm = pPreEntry->term;
2022 2023 2024 2025

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
2026
      syncEntryDestroy(pPreEntry);
2027 2028
    }

2029 2030
    return preTerm;
  } else {
2031 2032 2033 2034
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2035 2036 2037 2038
      }
    }
  }

2039
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
2040
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2041 2042
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2043 2044 2045 2046

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2047
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2048 2049 2050
  return 0;
}

M
Minghao Li 已提交
2051
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2052
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2053

S
Shengliang Guan 已提交
2054 2055 2056
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2057
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
2058 2059
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
2060
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
2061 2062
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
2063
    }
M
Minghao Li 已提交
2064

M
Minghao Li 已提交
2065
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
2066 2067
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
2068
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
2069 2070
      rpcFreeCont(rpcMsg.pCont);
      return;
2071
    }
M
Minghao Li 已提交
2072

S
Shengliang Guan 已提交
2073
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
2074
  }
M
Minghao Li 已提交
2075 2076
}

M
Minghao Li 已提交
2077
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2078
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2079

M
Minghao Li 已提交
2080 2081
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2082

2083
  if (pNode == NULL) return;
M
Minghao Li 已提交
2084 2085 2086 2087 2088

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2089

2090
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2091 2092 2093 2094
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2095

S
Shengliang Guan 已提交
2096
  SRpcMsg rpcMsg = {0};
2097 2098
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2099

S
Shengliang Guan 已提交
2100
  if (code != 0) {
M
Minghao Li 已提交
2101
    sError("failed to build elect msg");
M
Minghao Li 已提交
2102
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2103
    return;
M
Minghao Li 已提交
2104 2105
  }

S
Shengliang Guan 已提交
2106
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2107
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2108 2109 2110

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2111
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2112
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2113
    syncNodeRelease(pNode);
2114
    return;
M
Minghao Li 已提交
2115
  }
M
Minghao Li 已提交
2116 2117

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2118 2119
}

M
Minghao Li 已提交
2120
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2121
  if (!syncIsInit()) return;
2122

S
Shengliang Guan 已提交
2123
  SSyncNode* pNode = param;
C
cadem 已提交
2124
  if (pNode->totalReplicaNum > 1) {
S
Shengliang Guan 已提交
2125 2126
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2127
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2128 2129 2130
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2131
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2132
        return;
2133
      }
M
Minghao Li 已提交
2134

2135
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2136 2137
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2138
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2139 2140
        rpcFreeCont(rpcMsg.pCont);
        return;
2141
      }
S
Shengliang Guan 已提交
2142 2143 2144 2145

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2146
    } else {
S
Shengliang Guan 已提交
2147 2148
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2149
    }
M
Minghao Li 已提交
2150 2151 2152
  }
}

2153
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2154
  int64_t hbDataRid = (int64_t)param;
2155
  int64_t tsNow = taosGetTimestampMs();
2156

2157 2158
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
C
cadem 已提交
2159
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2160 2161
    return;
  }
2162

2163
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2164
  if (pSyncNode == NULL) {
2165
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2166
    sError("hb timer get pSyncNode NULL");
2167 2168 2169 2170 2171 2172 2173 2174
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2175
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2176 2177 2178
    return;
  }

M
Minghao Li 已提交
2179
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2180 2181
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2182
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2183 2184 2185
    return;
  }

C
cadem 已提交
2186
  sTrace("vgId:%d, eq peer hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid, pData->destId.addr);
2187

C
cadem 已提交
2188
  if (pSyncNode->totalReplicaNum > 1) {
M
Minghao Li 已提交
2189 2190 2191
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2192
    if (timerLogicClock == msgLogicClock) {
2193 2194 2195 2196 2197 2198
      if (tsNow > pData->execTime) {
        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

2199 2200
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

2201 2202 2203
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
2204
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
2205
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
2206
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
2207
        pSyncMsg->privateTerm = 0;
2208
        pSyncMsg->timeStamp = tsNow;
2209 2210 2211 2212 2213 2214

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
C
cadem 已提交
2215
        sTrace("vgId:%d, send heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
2216 2217
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2218 2219 2220
      } else {
      }

M
Minghao Li 已提交
2221 2222
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2223 2224
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2225 2226 2227 2228
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2229
    } else {
M
Minghao Li 已提交
2230 2231
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2232 2233
    }
  }
2234 2235 2236

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2237 2238
}

2239 2240
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2241 2242 2243 2244
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2245 2246
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2247 2248 2249 2250 2251 2252 2253 2254 2255
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2256
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
2257 2258 2259 2260 2261 2262 2263
  if (pEntry->dataLen < sizeof(SMsgHead)) {
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
    syncEntryDestroy(pEntry);
    return -1;
  }

B
Benguang Zhao 已提交
2264 2265
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2266
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
D
dapan1121 已提交
2267
    ASSERT(terrno != 0);
2268
    (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
2269
    syncEntryDestroy(pEntry);
B
Benguang Zhao 已提交
2270 2271 2272 2273
    return -1;
  }

  // proceed match index, with replicating on needed
2274
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2275

S
Shengliang Guan 已提交
2276
  sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2277 2278 2279
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2280

B
Benguang Zhao 已提交
2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2297
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
C
cadem 已提交
2298
  if (pSyncNode->totalReplicaNum == 1) {
2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2310
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2311 2312 2313 2314 2315 2316 2317 2318 2319
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2320 2321 2322
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
C
cadem 已提交
2323
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2339
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2340
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
2341
  SyncTerm  term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2342 2343 2344 2345 2346 2347 2348

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2349 2350
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2351 2352 2353
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2354 2355
  int32_t ret = 0;

2356
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
2357
  SyncTerm        term = raftStoreGetTerm(ths);
M
Minghao Li 已提交
2358
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2359
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2360

2361 2362
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2363
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2364
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
2365
    if (code != 0) {
M
Minghao Li 已提交
2366
      sError("append noop error");
2367 2368
      return -1;
    }
2369 2370

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2371 2372
  }

2373 2374 2375
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2376
    syncEntryDestroy(pEntry);
2377 2378
  }

M
Minghao Li 已提交
2379 2380 2381
  return ret;
}

S
Shengliang Guan 已提交
2382 2383
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2384
  bool           resetElect = false;
2385

M
Minghao Li 已提交
2386 2387 2388 2389
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2390
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2391
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2392
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2393

S
Shengliang Guan 已提交
2394 2395 2396 2397 2398
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
    sWarn(
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
        "cluster:%d",
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
2399 2400 2401
    return 0;
  }

2402 2403
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
2404
  SyncTerm currentTerm = raftStoreGetTerm(ths);
2405 2406

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2407 2408
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
2409
  pMsgReply->term = currentTerm;
2410
  pMsgReply->privateTerm = 8864;  // magic number
2411
  pMsgReply->startTime = ths->startTime;
2412
  pMsgReply->timeStamp = tsMs;
2413

C
cadem 已提交
2414 2415 2416 2417
  sTrace(
        "vgId:%d, heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);

2418
  if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2419
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
2420
    resetElect = true;
2421

M
Minghao Li 已提交
2422
    ths->minMatchIndex = pMsg->minMatchIndex;
2423

C
cadem 已提交
2424
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
S
Shengliang Guan 已提交
2425 2426 2427 2428
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2429 2430
      pSyncMsg->cmd =
          (ths->state == TAOS_SYNC_STATE_LEARNER) ? SYNC_LOCAL_CMD_LEARNER_CMT : SYNC_LOCAL_CMD_FOLLOWER_CMT;
2431 2432
      pSyncMsg->commitIndex = pMsg->commitIndex;
      pSyncMsg->currentTerm = pMsg->term;
2433 2434 2435 2436

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
2437
          sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code);
2438 2439
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2440 2441
          sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId,
                 pMsg->commitIndex, pMsg->term);
2442 2443
        }
      }
2444 2445 2446
    }
  }

C
cadem 已提交
2447
  if (pMsg->term >= currentTerm && ths->state == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
2448 2449 2450 2451
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2452
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
2453 2454
    pSyncMsg->currentTerm = pMsg->term;
    pSyncMsg->commitIndex = pMsg->commitIndex;
2455

S
Shengliang Guan 已提交
2456 2457
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2458 2459 2460 2461
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2462
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
2463
      }
2464 2465 2466 2467 2468
    }
  }

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
2469 2470

  if (resetElect) syncNodeResetElectTimer(ths);
2471 2472 2473
  return 0;
}

2474
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2475 2476 2477 2478
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2479
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2480
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2481 2482 2483 2484
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2485 2486

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2487
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2488

2489 2490
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2491
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
2492 2493
}

2494
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2495
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2496

M
Minghao Li 已提交
2497 2498 2499 2500
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2501
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2502
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2503
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2504

2505
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2506
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2507 2508 2509
  return 0;
}

S
Shengliang Guan 已提交
2510 2511
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2512 2513
  syncLogRecvLocalCmd(ths, pMsg, "");

2514
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
2515
    syncNodeStepDown(ths, pMsg->currentTerm);
2516

2517
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
2518 2519 2520 2521
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
      return 0;
    }
2522 2523 2524 2525
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
    if (pMsg->currentTerm == matchTerm) {
      (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
    }
2526
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
S
Shengliang Guan 已提交
2527
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
2528 2529
             ths->commitIndex);
    }
2530
  } else {
2531 2532 2533 2534 2535 2536
    sError("error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
2537 2538 2539 2540 2541 2542 2543 2544 2545 2546
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2547

2548
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2549
  sNTrace(ths, "on client request");
2550

B
Benguang Zhao 已提交
2551 2552
  int32_t code = 0;

B
Benguang Zhao 已提交
2553
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
2554
  SyncTerm        term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2555
  SSyncRaftEntry* pEntry = NULL;
2556 2557 2558 2559
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2560 2561
  }

2562 2563 2564 2565 2566
  if (pEntry == NULL) {
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
    return -1;
  }

B
Benguang Zhao 已提交
2567 2568 2569 2570 2571
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2572 2573
    int32_t code = syncNodeAppend(ths, pEntry);
    return code;
2574 2575 2576
  } else {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
2577
    return -1;
B
Benguang Zhao 已提交
2578 2579 2580
  }
}

S
Shengliang Guan 已提交
2581 2582 2583
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2584
      return "follower";
S
Shengliang Guan 已提交
2585
    case TAOS_SYNC_STATE_CANDIDATE:
2586
      return "candidate";
S
Shengliang Guan 已提交
2587
    case TAOS_SYNC_STATE_LEADER:
2588
      return "leader";
S
Shengliang Guan 已提交
2589
    case TAOS_SYNC_STATE_ERROR:
2590
      return "error";
S
Shengliang Guan 已提交
2591 2592
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
C
cadem 已提交
2593 2594
    case TAOS_SYNC_STATE_LEARNER:
      return "learner";
S
Shengliang Guan 已提交
2595 2596
    default:
      return "unknown";
S
Shengliang Guan 已提交
2597
  }
M
Minghao Li 已提交
2598
}
2599

2600
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
C
cadem 已提交
2601
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2602 2603 2604 2605
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
        .vgId = ths->vgId,
    };
2606 2607 2608 2609 2610 2611 2612 2613 2614 2615

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2616 2617 2618 2619
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

2620
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
C
cadem 已提交
2621
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2622 2623 2624 2625 2626
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2627 2628 2629 2630
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
C
cadem 已提交
2631
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
M
Minghao Li 已提交
2632 2633 2634 2635 2636
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2637
}
M
Minghao Li 已提交
2638

2639 2640
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
C
cadem 已提交
2641
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2642 2643 2644 2645 2646 2647 2648
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2649 2650
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
C
cadem 已提交
2651
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
M
Minghao Li 已提交
2652 2653 2654 2655 2656 2657 2658 2659 2660
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2661
  if (pState == NULL) {
2662
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2663 2664
    return false;
  }
M
Minghao Li 已提交
2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2690
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2691
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2692
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2693 2694 2695 2696 2697 2698
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2699
}