syncMain.c 88.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
C
cadem 已提交
40
#include "syncUtil.h"
M
Minghao Li 已提交
41

M
Minghao Li 已提交
42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
48 49 50
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58 59 60 61
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
64
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
65
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
66
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
67 68
    return -1;
  }
M
Minghao Li 已提交
69

S
Shengliang Guan 已提交
70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
71
  if (pSyncNode->rid < 0) {
72
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
73 74 75
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79 80 81
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
82
  return pSyncNode->rid;
M
Minghao Li 已提交
83
}
M
Minghao Li 已提交
84

B
Benguang Zhao 已提交
85
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
86
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
87
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
88
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
B
Benguang Zhao 已提交
89 90 91 92
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
93
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
94
    goto _err;
M
Minghao Li 已提交
95
  }
M
Minghao Li 已提交
96

B
Benguang Zhao 已提交
97 98 99 100
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
101

B
Benguang Zhao 已提交
102 103
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
104

105 106 107
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
108 109
}

M
Minghao Li 已提交
110
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
111
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
112
  if (pSyncNode != NULL) {
113
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
114
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
115
    syncNodeRemove(rid);
M
Minghao Li 已提交
116 117 118
  }
}

M
Minghao Li 已提交
119 120
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
121 122 123
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
124 125 126
  }
}

127 128 129 130 131 132 133 134
void syncPostStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodePostClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
}

S
Shengliang Guan 已提交
135 136 137
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
138 139
}

S
Shengliang Guan 已提交
140
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
141
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
142
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
143

C
cadem 已提交
144 145 146 147 148 149 150
  if(pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex){
    syncNodeRelease(pSyncNode);
    sInfo("vgId:%d, no need Reconfig, current index:%" PRId64 ", new index:%" PRId64, pSyncNode->vgId,
                                        pSyncNode->raftCfg.lastConfigIndex, pNewCfg->lastIndex);
    return 0;
  }

M
Minghao Li 已提交
151
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
152
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
153
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
154
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
155
    return -1;
M
Minghao Li 已提交
156
  }
157

S
Shengliang Guan 已提交
158
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
C
cadem 已提交
159
  syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex);
S
Shengliang Guan 已提交
160

M
Minghao Li 已提交
161 162 163
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

C
cadem 已提交
164
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
S
Shengliang Guan 已提交
165
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
166 167 168
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
169
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
170
  }
S
Shengliang Guan 已提交
171

S
Shengliang Guan 已提交
172
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
173
  return 0;
M
Minghao Li 已提交
174
}
M
Minghao Li 已提交
175

S
Shengliang Guan 已提交
176 177 178 179
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
180
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
181 182
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
183 184 185 186 187 188 189 190 191 192
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
193 194 195
    case TDMT_SYNC_TIMEOUT_ELECTION:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
215
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
216 217 218 219
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
C
cadem 已提交
220 221 222
    case TDMT_SYNC_FORCE_FOLLOWER:
      code = syncForceBecomeFollower(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
223
    default:
224
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
225
      code = -1;
M
Minghao Li 已提交
226 227
  }

S
Shengliang Guan 已提交
228
  syncNodeRelease(pSyncNode);
229 230 231 232
  if (code != 0) {
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
           terrno);
  }
S
Shengliang Guan 已提交
233
  return code;
234 235
}

S
Shengliang Guan 已提交
236
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
237
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
238
  if (pSyncNode == NULL) return -1;
239

S
Shengliang Guan 已提交
240
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
241
  syncNodeRelease(pSyncNode);
242 243 244
  return ret;
}

C
cadem 已提交
245 246
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  syncNodeBecomeFollower(ths, "force election");
C
cadem 已提交
247

C
cadem 已提交
248 249 250 251 252 253 254 255 256
  SRpcMsg rsp = {
      .code = 0,
      .pCont = pRpcMsg->info.rsp,
      .contLen = pRpcMsg->info.rspLen,
      .info = pRpcMsg->info,
  };
  tmsgSendRsp(&rsp);

  return 0;
C
cadem 已提交
257 258
}

259
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
260
  SSyncNode* pNode = syncNodeAcquire(rid);
261
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
262 263

  SRpcMsg rpcMsg = {0};
264
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
265 266 267
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
268
  if (ret == 1) {
269
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
270
    rpcSendResponse(&rpcMsg);
271 272
    return 0;
  } else {
273
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
274
    return -1;
275
  }
S
Shengliang Guan 已提交
276 277
}

M
Minghao Li 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

294
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
295
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
296
  if (pSyncNode == NULL) {
297
    sError("sync begin snapshot error");
298 299
    return -1;
  }
300

301 302 303 304 305 306 307 308 309 310
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
    syncNodeRelease(pSyncNode);
    return 0;
  }

311
  int32_t code = 0;
312
  int64_t logRetention = 0;
313

M
Minghao Li 已提交
314
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
315
    // mnode
316
    logRetention = tsMndLogRetention;
M
Minghao Li 已提交
317 318 319 320
  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas
321 322 323
      logRetention = SYNC_VNODE_LOG_RETENTION;
    }
  }
M
Minghao Li 已提交
324

C
cadem 已提交
325 326 327
  if (pSyncNode->totalReplicaNum > 1) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER 
        && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
328 329 330 331
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
              lastApplyIndex);
      syncNodeRelease(pSyncNode);
      return 0;
332
    }
333
    logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
334 335
  }

M
Minghao Li 已提交
336
_DEL_WAL:
337

M
Minghao Li 已提交
338
  do {
339 340 341 342 343 344 345 346 347 348 349
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

350
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
351 352 353 354 355 356 357 358
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
359

M
Minghao Li 已提交
360
      } else {
361 362
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
363
      }
364
    }
M
Minghao Li 已提交
365
  } while (0);
366

S
Shengliang Guan 已提交
367
  syncNodeRelease(pSyncNode);
368 369 370 371
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
372
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
373
  if (pSyncNode == NULL) {
374
    sError("sync end snapshot error");
375 376 377
    return -1;
  }

378 379 380 381
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
382
    if (code != 0) {
383
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
384
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
385 386
      return -1;
    } else {
S
Shengliang Guan 已提交
387
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
388 389
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
390
  }
391

S
Shengliang Guan 已提交
392
  syncNodeRelease(pSyncNode);
393 394 395
  return code;
}

M
Minghao Li 已提交
396
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
397
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
398
  if (pSyncNode == NULL) {
399
    sError("sync step down error");
M
Minghao Li 已提交
400 401 402
    return -1;
  }

M
Minghao Li 已提交
403
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
404
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
405
  return 0;
M
Minghao Li 已提交
406 407
}

408
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
409
  if (pSyncNode == NULL) {
410
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
411
    sError("sync ready for read error");
412 413
    return false;
  }
M
Minghao Li 已提交
414

415 416 417 418 419
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

420
  if (!pSyncNode->restoreFinish) {
421
    terrno = TSDB_CODE_SYN_RESTORING;
422
    return false;
423
  }
424

425
  return true;
426 427 428 429 430 431 432 433 434 435 436
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

437 438
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
439
}
M
Minghao Li 已提交
440

441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
463 464
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
465
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
466 467
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
468
  }
M
Minghao Li 已提交
469

470
  int32_t ret = 0;
471
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
472
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
473 474 475 476 477 478 479
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
480 481 482
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
483
  return ret;
M
Minghao Li 已提交
484 485
}

M
Minghao Li 已提交
486 487
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
488
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
489 490
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
491
  }
492

S
Shengliang Guan 已提交
493
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
494

495 496 497 498
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
499
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
M
Minghao Li 已提交
500 501 502
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
503
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
504 505
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
506 507
}

508 509
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
510

S
Shengliang Guan 已提交
511
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
512 513 514
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
515 516 517 518 519
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
520
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
521 522
  }

523
  return state;
M
Minghao Li 已提交
524 525
}

526
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
527 528
  ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
529

530 531 532 533
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
534 535
    }
  }
S
Shengliang Guan 已提交
536
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
537
         snapshotLastApplyIndex, lastIndex);
538 539 540 541

  return lastIndex;
}

542 543
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
544

S
Shengliang Guan 已提交
545
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
546
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
547

C
cadem 已提交
548 549
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
    if(pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
S
Shengliang Guan 已提交
550
    SEp* pEp = &pEpSet->eps[i];
551 552
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
S
Shengliang Guan 已提交
553
    pEpSet->numOfEps++;
554
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
555
  }
M
Minghao Li 已提交
556
  if (pEpSet->numOfEps > 0) {
557
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
558 559
  }

S
Shengliang Guan 已提交
560
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
561
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
562 563
}

S
Shengliang Guan 已提交
564
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
565
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
566
  if (pSyncNode == NULL) {
567
    sError("sync propose error");
M
Minghao Li 已提交
568
    return -1;
569
  }
570

S
Shengliang Guan 已提交
571
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
572
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
573 574
  return ret;
}
M
Minghao Li 已提交
575

C
cadem 已提交
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603
int32_t syncIsCatchUp(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync Node Acquire error since %d", errno);
    return -1;
  }

  while(1){
    if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
        pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
        pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){
      sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, 
                                    pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, 
                                    pSyncNode->pLogBuf->matchIndex);
      taosSsleep(1);
    }
    else{
      sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, 
                                    pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
                                    pSyncNode->pLogBuf->matchIndex);
      break;
    }
  }
  
  syncNodeRelease(pSyncNode);
  return 0;
}

S
Shengliang Guan 已提交
604
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
605 606
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
607
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
608 609
    return -1;
  }
610

S
Shengliang Guan 已提交
611 612 613 614 615 616 617
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
618

619
  // heartbeat timeout
620
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
621 622 623 624 625 626
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
627 628 629
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
630
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
631 632
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
633
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
634 635 636
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
637
    } else {
S
Shengliang Guan 已提交
638
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
639
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
640
             TMSG_INFO(pMsg->msgType));
641
      return -1;
642
    }
S
Shengliang Guan 已提交
643
  } else {
S
Shengliang Guan 已提交
644 645
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
646
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
647
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
648 649 650 651
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
652
    }
653

654 655 656
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
657
      sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
658
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
659
    }
M
Minghao Li 已提交
660

S
Shengliang Guan 已提交
661
    if (seq != NULL) *seq = seqNum;
662
    return code;
M
Minghao Li 已提交
663
  }
M
Minghao Li 已提交
664 665
}

S
Shengliang Guan 已提交
666
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
667 668 669 670 671
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
672
  pSyncTimer->timeStamp = taosGetTimestampMs();
673 674 675 676
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
677
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
678
  int32_t ret = 0;
S
Shengliang Guan 已提交
679
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
680
  if (syncIsInit()) {
681 682 683 684 685 686
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
687
    pSyncTimer->timeStamp = tsNow;
688 689

    pData->syncNodeRid = pSyncNode->rid;
690 691 692
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
693
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
694

C
cadem 已提交
695 696
    sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr);

697 698
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
699 700 701 702 703 704
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
705
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
706 707 708 709
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
710 711
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
712 713 714
  return ret;
}

715
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
716 717 718
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
719 720 721
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

722 723 724 725 726
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
S
Shengliang Guan 已提交
727
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
728 729 730 731 732 733 734
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
735
// open/close --------------
S
Shengliang Guan 已提交
736 737
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
738 739 740 741
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
742

M
Minghao Li 已提交
743 744 745 746
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
747
      goto _error;
M
Minghao Li 已提交
748
    }
749
  }
M
Minghao Li 已提交
750

751 752 753
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
S
Shengliang Guan 已提交
754
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
755

756
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
757
    // create a new raft config file
758
    sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
759 760
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
C
cadem 已提交
761
    pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;
762 763 764 765 766 767 768
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
    pSyncNode->raftCfg.configIndexCount = 1;
    pSyncNode->raftCfg.configIndexArr[0] = -1;

    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
769
      goto _error;
770
    }
771 772
  } else {
    // update syncCfg by raft_config.json
773 774
    if (syncReadCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
775
      goto _error;
776
    }
S
Shengliang Guan 已提交
777

C
cadem 已提交
778
    if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
779
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
780 781 782
      pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
      if (syncWriteCfgFile(pSyncNode) != 0) {
        sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
S
Shengliang Guan 已提交
783 784
        goto _error;
      }
S
Shengliang Guan 已提交
785
    } else {
786 787
      sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
788
    }
M
Minghao Li 已提交
789 790
  }

M
Minghao Li 已提交
791
  // init by SSyncInfo
M
Minghao Li 已提交
792
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
793
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
794
  bool      updated = false;
C
cadem 已提交
795 796 797
  sInfo("vgId:%d, start to open sync node, totalReplicaNum:%d replicaNum:%d selfIndex:%d", 
                          pSyncNode->vgId, pCfg->totalReplicaNum, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
S
Shengliang Guan 已提交
798
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
799 800 801
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
      updated = true;
    }
802 803
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
804 805
  }

806 807 808 809 810 811 812 813
  if (updated) {
    sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
      goto _error;
    }
  }

M
Minghao Li 已提交
814
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
815
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
816 817 818
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
819

B
Benguang Zhao 已提交
820 821 822
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
823
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
824 825 826
    goto _error;
  }

M
Minghao Li 已提交
827
  // init internal
828
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
829
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
830
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
831
    goto _error;
832
  }
M
Minghao Li 已提交
833

M
Minghao Li 已提交
834
  // init peersNum, peers, peersId
C
cadem 已提交
835
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
S
Shengliang Guan 已提交
836
  int32_t j = 0;
C
cadem 已提交
837
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
838 839 840
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
841 842 843
      j++;
    }
  }
S
Shengliang Guan 已提交
844
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
845
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
846
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
847
      goto _error;
848
    }
M
Minghao Li 已提交
849
  }
M
Minghao Li 已提交
850

M
Minghao Li 已提交
851
  // init replicaNum, replicasId
852
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
C
cadem 已提交
853 854
  pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
855
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
856
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
857
      goto _error;
858
    }
M
Minghao Li 已提交
859 860
  }

M
Minghao Li 已提交
861
  // init raft algorithm
M
Minghao Li 已提交
862
  pSyncNode->pFsm = pSyncInfo->pFsm;
863
  pSyncInfo->pFsm = NULL;
864
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
M
Minghao Li 已提交
865 866
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
867
  // init life cycle outside
M
Minghao Li 已提交
868

M
Minghao Li 已提交
869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
893
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
894
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
895
  if (raftStoreOpen(pSyncNode) != 0) {
S
Shengliang Guan 已提交
896
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
897 898
    goto _error;
  }
M
Minghao Li 已提交
899

M
Minghao Li 已提交
900
  // init TLA+ candidate vars
M
Minghao Li 已提交
901
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
902
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
903
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
904 905
    goto _error;
  }
M
Minghao Li 已提交
906
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
907
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
908
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
909 910
    goto _error;
  }
M
Minghao Li 已提交
911

M
Minghao Li 已提交
912 913
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
914
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
915
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
916 917
    goto _error;
  }
M
Minghao Li 已提交
918
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
919
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
920
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
921 922
    goto _error;
  }
M
Minghao Li 已提交
923 924 925

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
926
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
927
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
928 929
    goto _error;
  }
930 931 932 933

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
934
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
935 936
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
937
      sNTrace(pSyncNode, "reset commit index by snapshot");
938 939 940
    }
  }
  pSyncNode->commitIndex = commitIndex;
941
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
M
Minghao Li 已提交
942

943
  // restore log store on need
944
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
945
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
946 947
    goto _error;
  }
948

M
Minghao Li 已提交
949 950
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
951 952
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
953

M
Minghao Li 已提交
954
  // init ping timer
M
Minghao Li 已提交
955
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
956
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
957 958
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
959
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
960
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
961

M
Minghao Li 已提交
962 963
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
964
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
965
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
966
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
967 968 969 970
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
971
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
972 973
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
974
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
975 976
  pSyncNode->heartbeatTimerCounter = 0;

977
  // init peer heartbeat timer
C
cadem 已提交
978
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
979 980 981
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
982
  // tools
M
Minghao Li 已提交
983
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
984
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
985
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
986 987
    goto _error;
  }
M
Minghao Li 已提交
988

989 990
  // restore state
  pSyncNode->restoreFinish = false;
991

M
Minghao Li 已提交
992
  // snapshot senders
C
cadem 已提交
993
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
994
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
995 996 997 998
    if (pSender == NULL) return NULL;

    pSyncNode->senders[i] = pSender;
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
M
Minghao Li 已提交
999 1000 1001
  }

  // snapshot receivers
1002
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
1003 1004 1005
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
          pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1006

M
Minghao Li 已提交
1007 1008 1009
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
1010
  // replication mgr
1011
  if (syncNodeLogReplInit(pSyncNode) < 0) {
1012 1013 1014
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
B
Benguang Zhao 已提交
1015

M
Minghao Li 已提交
1016
  // peer state
1017 1018 1019 1020
  if (syncNodePeerStateInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
M
Minghao Li 已提交
1021

B
Benguang Zhao 已提交
1022
  //
M
Minghao Li 已提交
1023 1024 1025
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1026
  // start in syncNodeStart
M
Minghao Li 已提交
1027
  // start raft
M
Minghao Li 已提交
1028
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1029

M
Minghao Li 已提交
1030 1031
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1032
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1033 1034
  pSyncNode->lastReplicateTime = timeNow;

1035 1036 1037
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
1038 1039
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
1040
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1041
    goto _error;
B
Benguang Zhao 已提交
1042 1043
  }

1044
  pSyncNode->isStart = true;
1045 1046 1047
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1048 1049
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1050
  pSyncNode->tmrRoutineNum = 0;
1051

1052 1053
  sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
M
Minghao Li 已提交
1054
  return pSyncNode;
1055 1056 1057

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1058 1059
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1060 1061 1062 1063
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1064 1065
}

M
Minghao Li 已提交
1066 1067
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1068 1069
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1070 1071 1072 1073 1074 1075
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1076
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1077 1078
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1079 1080 1081 1082

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1083 1084
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
S
Shengliang Guan 已提交
1085
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
1086 1087 1088
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1089

1090
  ASSERT(endIndex == lastVer + 1);
1091 1092
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
B
Benguang Zhao 已提交
1093

1094
  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
B
Benguang Zhao 已提交
1095 1096 1097 1098 1099 1100 1101 1102
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
C
cadem 已提交
1103 1104 1105 1106 1107 1108 1109
  if(pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER){
    syncNodeBecomeLearner(pSyncNode, "first start");
  }
  else{
    if (pSyncNode->replicaNum == 1) {
      raftStoreNextTerm(pSyncNode);
      syncNodeBecomeLeader(pSyncNode, "one replica start");
B
Benguang Zhao 已提交
1110

C
cadem 已提交
1111 1112 1113 1114 1115
      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
    } else {
      syncNodeBecomeFollower(pSyncNode, "first start");
    }    
B
Benguang Zhao 已提交
1116 1117 1118 1119
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1120 1121
  if (ret != 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1122
  }
1123
  return ret;
M
Minghao Li 已提交
1124 1125
}

B
Benguang Zhao 已提交
1126
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1127 1128 1129 1130 1131 1132 1133
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1134 1135 1136 1137
  if (ret < 0) {
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
1138

1139
  ret = syncNodeStartPingTimer(pSyncNode);
1140 1141 1142 1143
  if (ret < 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
B
Benguang Zhao 已提交
1144
  return ret;
M
Minghao Li 已提交
1145 1146
}

M
Minghao Li 已提交
1147
void syncNodePreClose(SSyncNode* pSyncNode) {
1148 1149 1150 1151
  ASSERT(pSyncNode != NULL);
  ASSERT(pSyncNode->pFsm != NULL);
  ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);

M
Minghao Li 已提交
1152 1153 1154 1155 1156
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
1157

1158 1159 1160
  // stop ping timer
  syncNodeStopPingTimer(pSyncNode);

1161 1162
  // clean rsp
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
1163 1164
}

1165 1166 1167
void syncNodePostClose(SSyncNode* pSyncNode) {
  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1168
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1169 1170 1171 1172 1173 1174 1175
    }

    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }
M
Minghao Li 已提交
1176 1177
}

1178
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1179

M
Minghao Li 已提交
1180
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1181
  if (pSyncNode == NULL) return;
1182
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1183

1184 1185
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);

1186 1187 1188
  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);
1189
  syncNodeLogReplDestroy(pSyncNode);
1190

M
Minghao Li 已提交
1191
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1192
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1193
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1194
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1195
  votesRespondDestory(pSyncNode->pVotesRespond);
1196
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1197
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1198
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1199
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1200
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1201
  logStoreDestory(pSyncNode->pLogStore);
1202
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1203 1204
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1205

C
cadem 已提交
1206
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1207 1208
    if (pSyncNode->senders[i] != NULL) {
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
1209

1210 1211
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
        snapshotSenderStop(pSyncNode->senders[i], false);
1212 1213
      }

1214 1215
      snapshotSenderDestroy(pSyncNode->senders[i]);
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1216 1217 1218
    }
  }

M
Minghao Li 已提交
1219
  if (pSyncNode->pNewNodeReceiver != NULL) {
1220
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1221
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1222 1223
    }

1224
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1225 1226 1227 1228
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1229 1230 1231 1232
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

1233 1234
  raftStoreClose(pSyncNode);

1235
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1236 1237
}

1238
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
M
Minghao Li 已提交
1239

M
Minghao Li 已提交
1240 1241 1242
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1243 1244
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1245 1246 1247
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1248
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1249
  }
M
Minghao Li 已提交
1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1263
  if (syncIsInit()) {
1264
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1265

1266 1267 1268 1269 1270
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1271

M
Minghao Li 已提交
1272
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1273
                 &pSyncNode->pElectTimer);
1274

1275
  } else {
M
Minghao Li 已提交
1276
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1277
  }
M
Minghao Li 已提交
1278 1279 1280 1281 1282
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1283
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1284 1285
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1286

M
Minghao Li 已提交
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

1297
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1298 1299
  int32_t electMS;

1300
  if (pSyncNode->raftCfg.isStandBy) {
M
Minghao Li 已提交
1301 1302 1303 1304
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
1305 1306

  (void)syncNodeRestartElectTimer(pSyncNode, electMS);
1307

S
Shengliang Guan 已提交
1308 1309
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1310 1311
}

M
Minghao Li 已提交
1312
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1313
  int32_t ret = 0;
S
Shengliang Guan 已提交
1314 1315
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1316 1317 1318
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1319
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1320
  }
1321

S
Shengliang Guan 已提交
1322
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1323 1324 1325
  return ret;
}

M
Minghao Li 已提交
1326
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1327
  int32_t ret = 0;
M
Minghao Li 已提交
1328

1329
#if 0
M
Minghao Li 已提交
1330
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1331 1332
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1333

S
Shengliang Guan 已提交
1334
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1335
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1336 1337 1338
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1339
  }
1340

M
Minghao Li 已提交
1341 1342 1343
  return ret;
}

M
Minghao Li 已提交
1344 1345
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1346 1347

#if 0
M
Minghao Li 已提交
1348 1349 1350
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1351
#endif
1352

S
Shengliang Guan 已提交
1353
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1354
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1355 1356 1357
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1358
  }
1359

M
Minghao Li 已提交
1360 1361 1362
  return ret;
}

1363 1364 1365 1366 1367 1368
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

1369 1370 1371 1372 1373 1374 1375 1376
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
  SEpSet* epSet = NULL;
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
    if (destRaftId->addr == pNode->peersId[i].addr) {
      epSet = &pNode->peersEpset[i];
      break;
    }
  }
1377

S
Shengliang Guan 已提交
1378
  int32_t code = -1;
1379
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
M
Minghao Li 已提交
1380
    syncUtilMsgHtoN(pMsg->pCont);
1381
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1382 1383 1384 1385 1386 1387
    code = pNode->syncSendMSg(epSet, pMsg);
  }

  if (code < 0) {
    sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet,
           DID(destRaftId), destRaftId->addr, terrno);
S
Shengliang Guan 已提交
1388
    rpcFreeCont(pMsg->pCont);
1389
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
1390
  }
S
Shengliang Guan 已提交
1391 1392

  return code;
M
Minghao Li 已提交
1393 1394
}

1395
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
1396 1397 1398
  bool b1 = false;
  bool b2 = false;

C
cadem 已提交
1399
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
1400 1401
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
1402 1403 1404 1405 1406
      b1 = true;
      break;
    }
  }

C
cadem 已提交
1407
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
1408 1409 1410 1411
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
        .vgId = pNode->vgId,
    };
1412

1413
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
1414 1415 1416 1417 1418
      b2 = true;
      break;
    }
  }

1419
  ASSERT(b1 == b2);
1420 1421 1422
  return b1;
}

1423
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
C
cadem 已提交
1424
  if (pOldCfg->totalReplicaNum != pNewCfg->totalReplicaNum) return true;
1425
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
C
cadem 已提交
1426
  for (int32_t i = 0; i < pOldCfg->totalReplicaNum; ++i) {
1427 1428 1429 1430
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
C
cadem 已提交
1431
    if(pOldInfo->nodeRole != pNewInfo->nodeRole) return true;
1432 1433 1434 1435 1436
  }

  return false;
}

M
Minghao Li 已提交
1437
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1438
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1439 1440 1441 1442
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1443

1444 1445
  pSyncNode->raftCfg.cfg = *pNewConfig;
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
1446

1447 1448
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1449 1450
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1451

M
Minghao Li 已提交
1452 1453
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1454

M
Minghao Li 已提交
1455 1456 1457 1458
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1459
  }
1460

M
Minghao Li 已提交
1461 1462 1463 1464 1465
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1466

M
Minghao Li 已提交
1467
  // log begin config change
C
cadem 已提交
1468 1469 1470 1471
  sNInfo(pSyncNode, "begin do config change, from %d to %d, from %" PRId64 " to %" PRId64 ", replicas:%d", 
                                          pSyncNode->vgId, 
                                          oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum, 
                                          oldConfig.lastIndex, pNewConfig->lastIndex);
M
Minghao Li 已提交
1472

M
Minghao Li 已提交
1473
  if (IamInNew) {
1474
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1475
  }
M
Minghao Li 已提交
1476
  if (isDrop) {
1477
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
M
Minghao Li 已提交
1478 1479
  }

M
Minghao Li 已提交
1480
  // add last config index
1481
  syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
M
Minghao Li 已提交
1482

M
Minghao Li 已提交
1483 1484 1485 1486 1487
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
C
cadem 已提交
1488
    SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
M
Minghao Li 已提交
1489
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
C
cadem 已提交
1490 1491
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1492
      oldSenders[i] = pSyncNode->senders[i];
S
Shengliang Guan 已提交
1493
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1494
    }
1495

M
Minghao Li 已提交
1496
    // init internal
1497
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
1498
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1499 1500

    // init peersNum, peers, peersId
C
cadem 已提交
1501
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
S
Shengliang Guan 已提交
1502
    int32_t j = 0;
C
cadem 已提交
1503
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1504 1505 1506
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
1507 1508 1509
        j++;
      }
    }
S
Shengliang Guan 已提交
1510
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1511
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1512
    }
1513

M
Minghao Li 已提交
1514
    // init replicaNum, replicasId
1515
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
C
cadem 已提交
1516 1517
    pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
1518
      syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1519
    }
1520

1521
    // update quorum first
1522
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
1523

M
Minghao Li 已提交
1524 1525 1526 1527
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1528

M
Minghao Li 已提交
1529
    // reset snapshot senders
1530

M
Minghao Li 已提交
1531
    // clear new
C
cadem 已提交
1532
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1533
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1534
    }
M
Minghao Li 已提交
1535

M
Minghao Li 已提交
1536
    // reset new
C
cadem 已提交
1537
    for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
M
Minghao Li 已提交
1538 1539
      // reset sender
      bool reset = false;
C
cadem 已提交
1540
      for (int32_t j = 0; j < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++j) {
M
Minghao Li 已提交
1541
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
1542 1543
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
M
Minghao Li 已提交
1544

1545
          pSyncNode->senders[i] = oldSenders[j];
M
Minghao Li 已提交
1546 1547 1548 1549
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
1550 1551
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
          pSyncNode->senders[i]->replicaIndex = i;
M
Minghao Li 已提交
1552

1553 1554
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
M
Minghao Li 已提交
1555 1556

          break;
M
Minghao Li 已提交
1557
        }
1558 1559
      }
    }
1560

M
Minghao Li 已提交
1561
    // create new
C
cadem 已提交
1562
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
1563 1564 1565 1566 1567 1568 1569 1570
      if (pSyncNode->senders[i] == NULL) {
        pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
        if (pSyncNode->senders[i] == NULL) {
          // will be created later while send snapshot
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
        } else {
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
        }
S
Shengliang Guan 已提交
1571
      } else {
1572
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
M
Minghao Li 已提交
1573
      }
1574 1575
    }

M
Minghao Li 已提交
1576
    // free old
C
cadem 已提交
1577
    for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1578
      if (oldSenders[i] != NULL) {
1579
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1580 1581 1582
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1583 1584
    }

1585
    // persist cfg
1586
    syncWriteCfgFile(pSyncNode);
1587

M
Minghao Li 已提交
1588 1589
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
1590
      syncNodeBecomeLeader(pSyncNode, "");
1591 1592 1593

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1594
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1595

M
Minghao Li 已提交
1596
    } else {
1597
      syncNodeBecomeFollower(pSyncNode, "");
M
Minghao Li 已提交
1598 1599
    }
  } else {
1600
    // persist cfg
1601
    syncWriteCfgFile(pSyncNode);
C
cadem 已提交
1602
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
1603
  }
1604

M
Minghao Li 已提交
1605
_END:
M
Minghao Li 已提交
1606
  // log end config change
C
cadem 已提交
1607
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
M
Minghao Li 已提交
1608 1609
}

M
Minghao Li 已提交
1610 1611
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
1612
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1613
    raftStoreSetTerm(pSyncNode, term);
1614
    char tmpBuf[64];
1615
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1616
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1617
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1618 1619 1620
  }
}

1621
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
1622
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1623
    raftStoreSetTerm(pSyncNode, term);
1624 1625 1626
  }
}

M
Minghao Li 已提交
1627
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
1628 1629 1630
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
  if (currentTerm > newTerm) {
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1631 1632
    return;
  }
M
Minghao Li 已提交
1633 1634

  do {
1635
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1636 1637
  } while (0);

1638
  if (currentTerm < newTerm) {
S
Shengliang Guan 已提交
1639
    raftStoreSetTerm(pSyncNode, newTerm);
M
Minghao Li 已提交
1640
    char tmpBuf[64];
1641
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1642
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1643
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1644 1645 1646 1647 1648 1649 1650 1651

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1652 1653
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1654
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1655
  // maybe clear leader cache
M
Minghao Li 已提交
1656 1657 1658 1659
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1660 1661
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1662
  // state change
M
Minghao Li 已提交
1663 1664 1665
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

1666 1667
  // trace log
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1668

1669 1670 1671
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1672 1673 1674 1675 1676
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1677 1678 1679
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1680 1681 1682
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

1683 1684
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1685 1686
}

C
cadem 已提交
1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
  pSyncNode->hbSlowNum = 0;

  // state change
  pSyncNode->state = TAOS_SYNC_STATE_LEARNER;

  // trace log
  sNTrace(pSyncNode, "become learner %s", debugStr);

  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLearnerCb != NULL) {
    pSyncNode->pFsm->FpBecomeLearnerCb(pSyncNode->pFsm);
  }

  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
}

M
Minghao Li 已提交
1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725
// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1726
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1727 1728
  pSyncNode->leaderTime = taosGetTimestampMs();

1729
  pSyncNode->becomeLeaderNum++;
1730
  pSyncNode->hbrSlowNum = 0;
1731

1732 1733 1734
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1735
  // state change
M
Minghao Li 已提交
1736
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1737 1738

  // set leader cache
M
Minghao Li 已提交
1739 1740
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1741
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
1742 1743 1744
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1745
    ASSERT(code == 0);
1746
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1747 1748
  }

S
Shengliang Guan 已提交
1749
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1750 1751
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1752 1753 1754
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1755 1756 1757
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1758
#if 0
1759 1760
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1761
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1762
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1763 1764
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
1765
      }
1766
    }
1767
    (pMySender->privateTerm) += 100;
1768
  }
M
Minghao Li 已提交
1769
#endif
1770

1771
  // close receiver
1772
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1773
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1774 1775
  }

M
Minghao Li 已提交
1776
  // stop elect timer
M
Minghao Li 已提交
1777
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1778

M
Minghao Li 已提交
1779 1780
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1781

M
Minghao Li 已提交
1782 1783
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1784

1785 1786 1787 1788 1789
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1790 1791 1792
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1793 1794 1795
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1796
  // trace log
1797
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1798 1799 1800
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1801
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1802 1803 1804 1805 1806
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
  if (!granted) {
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
    return;
  }
1807
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1808

S
Shengliang Guan 已提交
1809
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1810

B
Benguang Zhao 已提交
1811
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1812 1813 1814 1815
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1816
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1817
  ASSERT(lastIndex >= 0);
1818 1819
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1820 1821
}

M
Minghao Li 已提交
1822 1823
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1824
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
C
cadem 已提交
1825
  for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
M
Minghao Li 已提交
1826 1827 1828 1829 1830
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1831 1832 1833
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1834
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1835
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1836
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1837
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1838
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1839

S
Shengliang Guan 已提交
1840
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1841 1842 1843
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1844
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1845
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1846
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1847
  sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1848
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1849

S
Shengliang Guan 已提交
1850
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1851 1852 1853
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1854
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1855
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1856
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1857
  sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1858
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1859

S
Shengliang Guan 已提交
1860
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1861 1862
}

M
Minghao Li 已提交
1863 1864
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1865
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1866
  ASSERT(term == raftStoreGetTerm(pSyncNode));
1867 1868
  bool voted = raftStoreHasVoted(pSyncNode);
  ASSERT(!voted);
M
Minghao Li 已提交
1869

S
Shengliang Guan 已提交
1870
  raftStoreVote(pSyncNode, pRaftId);
M
Minghao Li 已提交
1871 1872
}

M
Minghao Li 已提交
1873
// simulate get vote from outside
1874 1875
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1876

S
Shengliang Guan 已提交
1877 1878
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1879
  if (ret != 0) return;
M
Minghao Li 已提交
1880

S
Shengliang Guan 已提交
1881
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1882 1883
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
1884
  pMsg->term = currentTerm;
M
Minghao Li 已提交
1885 1886 1887 1888
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1889
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1890 1891
}

M
Minghao Li 已提交
1892
// return if has a snapshot
M
Minghao Li 已提交
1893 1894
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1895
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1896 1897
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1898 1899 1900 1901 1902 1903 1904
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1905 1906
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1907
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1908
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1909 1910
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1911 1912 1913 1914 1915 1916 1917
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1918 1919
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1920 1921
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1922 1923
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1924
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1925 1926
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1927 1928
    }

M
Minghao Li 已提交
1929 1930 1931
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1932 1933 1934 1935
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1936
  } else {
M
Minghao Li 已提交
1937 1938
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1939
  }
M
Minghao Li 已提交
1940

M
Minghao Li 已提交
1941 1942 1943 1944 1945 1946 1947
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1948 1949
  return 0;
}
M
Minghao Li 已提交
1950

M
Minghao Li 已提交
1951
// return append-entries first try index
M
Minghao Li 已提交
1952 1953 1954 1955 1956
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1957 1958
// if index > 0, return index - 1
// else, return -1
1959 1960 1961 1962 1963 1964 1965 1966 1967
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1968 1969 1970 1971
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1972 1973 1974 1975 1976 1977 1978 1979 1980
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

1981 1982 1983
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

1984
  SSyncRaftEntry* pPreEntry = NULL;
1985 1986 1987 1988 1989 1990 1991
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

1992
    pSyncNode->pLogStore->cacheHit++;
1993 1994 1995
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
1996
    pSyncNode->pLogStore->cacheMiss++;
1997 1998 1999 2000
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
2001 2002 2003 2004 2005 2006

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2007
  if (code == 0) {
2008
    ASSERT(pPreEntry != NULL);
2009
    preTerm = pPreEntry->term;
2010 2011 2012 2013

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
2014
      syncEntryDestroy(pPreEntry);
2015 2016
    }

2017 2018
    return preTerm;
  } else {
2019 2020 2021 2022
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2023 2024 2025 2026
      }
    }
  }

2027
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
2028
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2029 2030
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2031 2032 2033 2034

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2035
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2036 2037 2038
  return 0;
}

M
Minghao Li 已提交
2039
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2040
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2041

S
Shengliang Guan 已提交
2042 2043 2044
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2045
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
2046 2047
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
2048
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
2049 2050
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
2051
    }
M
Minghao Li 已提交
2052

M
Minghao Li 已提交
2053
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
2054 2055
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
2056
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
2057 2058
      rpcFreeCont(rpcMsg.pCont);
      return;
2059
    }
M
Minghao Li 已提交
2060

S
Shengliang Guan 已提交
2061
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
2062
  }
M
Minghao Li 已提交
2063 2064
}

M
Minghao Li 已提交
2065
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2066
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2067

M
Minghao Li 已提交
2068 2069
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2070

2071
  if (pNode == NULL) return;
M
Minghao Li 已提交
2072 2073 2074 2075 2076

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2077

2078
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2079 2080 2081 2082
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2083

S
Shengliang Guan 已提交
2084
  SRpcMsg rpcMsg = {0};
2085 2086
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2087

S
Shengliang Guan 已提交
2088
  if (code != 0) {
M
Minghao Li 已提交
2089
    sError("failed to build elect msg");
M
Minghao Li 已提交
2090
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2091
    return;
M
Minghao Li 已提交
2092 2093
  }

S
Shengliang Guan 已提交
2094
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2095
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2096 2097 2098

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2099
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2100
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2101
    syncNodeRelease(pNode);
2102
    return;
M
Minghao Li 已提交
2103
  }
M
Minghao Li 已提交
2104 2105

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2106 2107
}

M
Minghao Li 已提交
2108
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2109
  if (!syncIsInit()) return;
2110

S
Shengliang Guan 已提交
2111
  SSyncNode* pNode = param;
C
cadem 已提交
2112
  if (pNode->totalReplicaNum > 1) {
S
Shengliang Guan 已提交
2113 2114
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2115
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2116 2117 2118
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2119
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2120
        return;
2121
      }
M
Minghao Li 已提交
2122

2123
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2124 2125
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2126
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2127 2128
        rpcFreeCont(rpcMsg.pCont);
        return;
2129
      }
S
Shengliang Guan 已提交
2130 2131 2132 2133

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2134
    } else {
S
Shengliang Guan 已提交
2135 2136
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2137
    }
M
Minghao Li 已提交
2138 2139 2140
  }
}

2141
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2142
  int64_t hbDataRid = (int64_t)param;
2143
  int64_t tsNow = taosGetTimestampMs();
2144

2145 2146
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
C
cadem 已提交
2147
    sError("hb timer get pData NULL, rid:%" PRId64 " addr:%" PRId64, hbDataRid, pData->destId.addr);
2148 2149
    return;
  }
2150

2151
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2152
  if (pSyncNode == NULL) {
2153
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2154
    sError("hb timer get pSyncNode NULL");
2155 2156 2157 2158 2159 2160 2161 2162
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2163
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2164 2165 2166
    return;
  }

M
Minghao Li 已提交
2167
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2168 2169
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2170
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2171 2172 2173
    return;
  }

C
cadem 已提交
2174
  sTrace("vgId:%d, eq peer hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid, pData->destId.addr);
2175

C
cadem 已提交
2176
  if (pSyncNode->totalReplicaNum > 1) {
M
Minghao Li 已提交
2177 2178 2179
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2180
    if (timerLogicClock == msgLogicClock) {
2181 2182 2183 2184 2185 2186
      if (tsNow > pData->execTime) {
        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

2187 2188
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

2189 2190 2191
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
2192
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
2193
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
2194
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
2195
        pSyncMsg->privateTerm = 0;
2196
        pSyncMsg->timeStamp = tsNow;
2197 2198 2199 2200 2201 2202

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
C
cadem 已提交
2203
        sTrace("vgId:%d, send heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
2204 2205
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2206 2207 2208
      } else {
      }

M
Minghao Li 已提交
2209 2210
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2211 2212
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2213 2214 2215 2216
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2217
    } else {
M
Minghao Li 已提交
2218 2219
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2220 2221
    }
  }
2222 2223 2224

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2225 2226
}

2227 2228
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2229 2230 2231 2232
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2233 2234
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2235 2236 2237 2238 2239 2240 2241 2242 2243
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2244
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
2245 2246 2247 2248 2249 2250 2251
  if (pEntry->dataLen < sizeof(SMsgHead)) {
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
    syncEntryDestroy(pEntry);
    return -1;
  }

B
Benguang Zhao 已提交
2252 2253
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2254
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
D
dapan1121 已提交
2255
    ASSERT(terrno != 0);
2256
    (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
2257
    syncEntryDestroy(pEntry);
B
Benguang Zhao 已提交
2258 2259 2260 2261
    return -1;
  }

  // proceed match index, with replicating on needed
2262
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2263

S
Shengliang Guan 已提交
2264
  sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2265 2266 2267
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2268

B
Benguang Zhao 已提交
2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2285
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
C
cadem 已提交
2286
  if (pSyncNode->totalReplicaNum == 1) {
2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2298
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2299 2300 2301 2302 2303 2304 2305 2306 2307
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2308 2309 2310
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
C
cadem 已提交
2311
  for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) {
2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2327
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2328
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
2329
  SyncTerm  term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2330 2331 2332 2333 2334 2335 2336

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2337 2338
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2339 2340 2341
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2342 2343
  int32_t ret = 0;

2344
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
2345
  SyncTerm        term = raftStoreGetTerm(ths);
M
Minghao Li 已提交
2346
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2347
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2348

2349 2350
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2351
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2352
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
2353
    if (code != 0) {
M
Minghao Li 已提交
2354
      sError("append noop error");
2355 2356
      return -1;
    }
2357 2358

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2359 2360
  }

2361 2362 2363
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2364
    syncEntryDestroy(pEntry);
2365 2366
  }

M
Minghao Li 已提交
2367 2368 2369
  return ret;
}

S
Shengliang Guan 已提交
2370 2371
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2372
  bool           resetElect = false;
2373

M
Minghao Li 已提交
2374 2375 2376 2377
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2378
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2379
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2380
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2381

S
Shengliang Guan 已提交
2382 2383 2384 2385 2386
  if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
    sWarn(
        "vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
        "cluster:%d",
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
2387 2388 2389
    return 0;
  }

2390 2391
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
2392
  SyncTerm currentTerm = raftStoreGetTerm(ths);
2393 2394

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2395 2396
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
2397
  pMsgReply->term = currentTerm;
2398
  pMsgReply->privateTerm = 8864;  // magic number
2399
  pMsgReply->startTime = ths->startTime;
2400
  pMsgReply->timeStamp = tsMs;
2401

C
cadem 已提交
2402 2403 2404 2405
  sTrace(
        "vgId:%d, heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
        ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);

2406
  if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2407
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
2408
    resetElect = true;
2409

M
Minghao Li 已提交
2410
    ths->minMatchIndex = pMsg->minMatchIndex;
2411

C
cadem 已提交
2412
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
S
Shengliang Guan 已提交
2413 2414 2415 2416
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2417
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
2418 2419 2420
      pSyncMsg->commitIndex = pMsg->commitIndex;
      pSyncMsg->currentTerm = pMsg->term;
      SyncIndex fcIndex = pSyncMsg->commitIndex;
2421 2422 2423 2424 2425 2426 2427

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2428
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2429 2430
        }
      }
2431 2432 2433
    }
  }

C
cadem 已提交
2434
  if (pMsg->term >= currentTerm && ths->state == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
2435 2436 2437 2438
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2439
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
2440 2441
    pSyncMsg->currentTerm = pMsg->term;
    pSyncMsg->commitIndex = pMsg->commitIndex;
2442

S
Shengliang Guan 已提交
2443 2444
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2445 2446 2447 2448
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2449
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
2450
      }
2451 2452 2453
    }
  }

C
cadem 已提交
2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468
  if (pMsg->term >= currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) {
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
    pSyncMsg->cmd = SYNC_LOCAL_CMD_LEARNER_CMT;
    pSyncMsg->currentTerm = pMsg->term;
    pSyncMsg->commitIndex = pMsg->commitIndex;

    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2469
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pMsg->term);
C
cadem 已提交
2470 2471 2472 2473
      }
    }
  }

2474 2475
  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
2476 2477

  if (resetElect) syncNodeResetElectTimer(ths);
2478 2479 2480
  return 0;
}

2481
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2482 2483 2484 2485
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2486
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2487
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2488 2489 2490 2491
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2492 2493

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2494
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2495

2496 2497
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2498
  return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
2499 2500
}

2501
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2502
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2503

M
Minghao Li 已提交
2504 2505 2506 2507
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2508
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2509
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2510
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2511

2512
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2513
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2514 2515 2516
  return 0;
}

S
Shengliang Guan 已提交
2517 2518
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2519 2520
  syncLogRecvLocalCmd(ths, pMsg, "");

2521
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
2522
    syncNodeStepDown(ths, pMsg->currentTerm);
2523 2524

  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
2525 2526 2527 2528
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
      return 0;
    }
2529 2530 2531 2532
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
    if (pMsg->currentTerm == matchTerm) {
      (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
    }
2533
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
S
Shengliang Guan 已提交
2534
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
2535 2536
             ths->commitIndex);
    }
C
cadem 已提交
2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT){
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
      return 0;
    }
    raftStoreSetTerm(ths, pMsg->currentTerm);
    (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
    sTrace("vgId:%d, start to commit raft log in heartbeat. commit index:%" PRId64 "", ths->vgId, ths->commitIndex);
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
             ths->commitIndex);
    }
  }
  else {
2551 2552 2553 2554 2555 2556
    sError("error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
2557 2558 2559 2560 2561 2562 2563 2564 2565 2566
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2567

2568
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2569
  sNTrace(ths, "on client request");
2570

B
Benguang Zhao 已提交
2571 2572
  int32_t code = 0;

B
Benguang Zhao 已提交
2573
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
2574
  SyncTerm        term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2575
  SSyncRaftEntry* pEntry = NULL;
2576 2577 2578 2579
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2580 2581
  }

2582 2583 2584 2585 2586
  if (pEntry == NULL) {
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
    return -1;
  }

B
Benguang Zhao 已提交
2587 2588 2589 2590 2591
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2592 2593
    int32_t code = syncNodeAppend(ths, pEntry);
    return code;
2594 2595 2596
  } else {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
2597
    return -1;
B
Benguang Zhao 已提交
2598 2599 2600
  }
}

S
Shengliang Guan 已提交
2601 2602 2603
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2604
      return "follower";
S
Shengliang Guan 已提交
2605
    case TAOS_SYNC_STATE_CANDIDATE:
2606
      return "candidate";
S
Shengliang Guan 已提交
2607
    case TAOS_SYNC_STATE_LEADER:
2608
      return "leader";
S
Shengliang Guan 已提交
2609
    case TAOS_SYNC_STATE_ERROR:
2610
      return "error";
S
Shengliang Guan 已提交
2611 2612
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
C
cadem 已提交
2613 2614
    case TAOS_SYNC_STATE_LEARNER:
      return "learner";
S
Shengliang Guan 已提交
2615 2616
    default:
      return "unknown";
S
Shengliang Guan 已提交
2617
  }
M
Minghao Li 已提交
2618
}
2619

2620
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
C
cadem 已提交
2621
  for (int32_t i = 0; i < pNewCfg->totalReplicaNum; ++i) {
2622 2623 2624 2625
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
        .vgId = ths->vgId,
    };
2626 2627 2628 2629 2630 2631 2632 2633 2634 2635

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2636 2637 2638 2639
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

2640
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
C
cadem 已提交
2641
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2642 2643 2644 2645 2646
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2647 2648 2649 2650
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
C
cadem 已提交
2651
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
M
Minghao Li 已提交
2652 2653 2654 2655 2656
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2657
}
M
Minghao Li 已提交
2658

2659 2660
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
C
cadem 已提交
2661
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
2662 2663 2664 2665 2666 2667 2668
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2669 2670
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
C
cadem 已提交
2671
  for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
M
Minghao Li 已提交
2672 2673 2674 2675 2676 2677 2678 2679 2680
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2681
  if (pState == NULL) {
2682
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2683 2684
    return false;
  }
M
Minghao Li 已提交
2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2710
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2711
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2712
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2713 2714 2715 2716 2717 2718
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2719
}