syncMain.c 83.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
M
Minghao Li 已提交
40

M
Minghao Li 已提交
41 42 43 44
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
45
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
46
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
47 48 49
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
50 51 52 53 54 55 56 57 58 59 60
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
61

62
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
63
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
64
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
65
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
66 67
    return -1;
  }
M
Minghao Li 已提交
68

S
Shengliang Guan 已提交
69
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
70
  if (pSyncNode->rid < 0) {
71
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
72 73 74
    return -1;
  }

S
Shengliang Guan 已提交
75 76 77 78 79 80
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
81
  return pSyncNode->rid;
M
Minghao Li 已提交
82
}
M
Minghao Li 已提交
83

B
Benguang Zhao 已提交
84
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
85
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
86
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
87
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
B
Benguang Zhao 已提交
88 89 90 91
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
92
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
93
    goto _err;
M
Minghao Li 已提交
94
  }
M
Minghao Li 已提交
95

B
Benguang Zhao 已提交
96 97 98 99
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
100

B
Benguang Zhao 已提交
101 102
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
103

104 105 106
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
107 108
}

M
Minghao Li 已提交
109
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
110
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
111
  if (pSyncNode != NULL) {
112
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
113
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
114
    syncNodeRemove(rid);
M
Minghao Li 已提交
115 116 117
  }
}

M
Minghao Li 已提交
118 119
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
120 121 122
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
123 124 125
  }
}

126 127 128 129 130 131 132 133
void syncPostStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodePostClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
}

S
Shengliang Guan 已提交
134 135 136
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
137 138
}

S
Shengliang Guan 已提交
139
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
140
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
141
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
142

M
Minghao Li 已提交
143
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
144
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
145
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
146
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
147
    return -1;
M
Minghao Li 已提交
148
  }
149

S
Shengliang Guan 已提交
150 151
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
152

M
Minghao Li 已提交
153 154 155 156
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
157
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
158 159 160
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
161
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
162
  }
S
Shengliang Guan 已提交
163

S
Shengliang Guan 已提交
164
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
165
  return 0;
M
Minghao Li 已提交
166
}
M
Minghao Li 已提交
167

S
Shengliang Guan 已提交
168 169 170 171
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
172
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
173 174
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
175 176 177 178 179 180 181 182 183 184
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
185 186 187
    case TDMT_SYNC_TIMEOUT_ELECTION:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
S
Shengliang Guan 已提交
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
207
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
208 209 210 211 212
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
    default:
213
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
214
      code = -1;
M
Minghao Li 已提交
215 216
  }

S
Shengliang Guan 已提交
217
  syncNodeRelease(pSyncNode);
218 219 220 221
  if (code != 0) {
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
           terrno);
  }
S
Shengliang Guan 已提交
222
  return code;
223 224
}

S
Shengliang Guan 已提交
225
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
226
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
227
  if (pSyncNode == NULL) return -1;
228

S
Shengliang Guan 已提交
229
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
230
  syncNodeRelease(pSyncNode);
231 232 233
  return ret;
}

234
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
235
  SSyncNode* pNode = syncNodeAcquire(rid);
236
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
237 238

  SRpcMsg rpcMsg = {0};
239
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
240 241 242
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
243
  if (ret == 1) {
244
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
245
    rpcSendResponse(&rpcMsg);
246 247
    return 0;
  } else {
248
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
249
    return -1;
250
  }
S
Shengliang Guan 已提交
251 252
}

M
Minghao Li 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

269
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
270
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
271
  if (pSyncNode == NULL) {
272
    sError("sync begin snapshot error");
273 274
    return -1;
  }
275

276 277 278 279 280 281 282 283 284 285
  SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
  bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

  if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
    sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
    syncNodeRelease(pSyncNode);
    return 0;
  }

286
  int32_t code = 0;
287
  int64_t logRetention = 0;
288

M
Minghao Li 已提交
289
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
290
    // mnode
291
    logRetention = tsMndLogRetention;
M
Minghao Li 已提交
292 293 294 295
  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas
296 297 298
      logRetention = SYNC_VNODE_LOG_RETENTION;
    }
  }
M
Minghao Li 已提交
299

300 301 302 303 304 305
  if (pSyncNode->replicaNum > 1) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
              lastApplyIndex);
      syncNodeRelease(pSyncNode);
      return 0;
306
    }
307
    logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
308 309
  }

M
Minghao Li 已提交
310
_DEL_WAL:
311

M
Minghao Li 已提交
312
  do {
313 314 315 316 317 318 319 320 321 322 323
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

324
        code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
325 326 327 328 329 330 331 332
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
333

M
Minghao Li 已提交
334
      } else {
335 336
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
337
      }
338
    }
M
Minghao Li 已提交
339
  } while (0);
340

S
Shengliang Guan 已提交
341
  syncNodeRelease(pSyncNode);
342 343 344 345
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
346
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
347
  if (pSyncNode == NULL) {
348
    sError("sync end snapshot error");
349 350 351
    return -1;
  }

352 353 354 355
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
356
    if (code != 0) {
357
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
358
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
359 360
      return -1;
    } else {
S
Shengliang Guan 已提交
361
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
362 363
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
364
  }
365

S
Shengliang Guan 已提交
366
  syncNodeRelease(pSyncNode);
367 368 369
  return code;
}

M
Minghao Li 已提交
370
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
371
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
372
  if (pSyncNode == NULL) {
373
    sError("sync step down error");
M
Minghao Li 已提交
374 375 376
    return -1;
  }

M
Minghao Li 已提交
377
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
378
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
379
  return 0;
M
Minghao Li 已提交
380 381
}

382
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
383
  if (pSyncNode == NULL) {
384
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
385
    sError("sync ready for read error");
386 387
    return false;
  }
M
Minghao Li 已提交
388

389 390 391 392 393
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

394
  if (!pSyncNode->restoreFinish) {
395
    terrno = TSDB_CODE_SYN_RESTORING;
396
    return false;
397
  }
398

399
  return true;
400 401 402 403 404 405 406 407 408 409 410
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

411 412
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
413
}
M
Minghao Li 已提交
414

415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
437 438
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
439
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
440 441
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
442
  }
M
Minghao Li 已提交
443

444
  int32_t ret = 0;
445
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
446
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
447 448 449 450 451 452 453
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
454 455 456
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
457
  return ret;
M
Minghao Li 已提交
458 459
}

M
Minghao Li 已提交
460 461
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
462
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
463 464
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
465
  }
466

S
Shengliang Guan 已提交
467
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
468

469 470 471 472
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
473
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
M
Minghao Li 已提交
474 475 476
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
477
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
478 479
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
480 481
}

482 483
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
484

S
Shengliang Guan 已提交
485
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
486 487 488
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
489 490 491 492 493
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
494
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
495 496
  }

497
  return state;
M
Minghao Li 已提交
498 499
}

500
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
501 502
  ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
503

504 505 506 507
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
508 509
    }
  }
S
Shengliang Guan 已提交
510
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
511
         snapshotLastApplyIndex, lastIndex);
512 513 514 515

  return lastIndex;
}

516 517
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
518

S
Shengliang Guan 已提交
519
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
520
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
521

522
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
523
    SEp* pEp = &pEpSet->eps[i];
524 525
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
S
Shengliang Guan 已提交
526
    pEpSet->numOfEps++;
527
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
528
  }
M
Minghao Li 已提交
529
  if (pEpSet->numOfEps > 0) {
530
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
531 532
  }

S
Shengliang Guan 已提交
533
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
534
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
535 536
}

S
Shengliang Guan 已提交
537
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
538
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
539
  if (pSyncNode == NULL) {
540
    sError("sync propose error");
M
Minghao Li 已提交
541
    return -1;
542
  }
543

S
Shengliang Guan 已提交
544
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
545
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
546 547
  return ret;
}
M
Minghao Li 已提交
548

S
Shengliang Guan 已提交
549
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
550 551
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
552
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
553 554
    return -1;
  }
555

S
Shengliang Guan 已提交
556 557 558 559 560 561 562
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
563

564
  // heartbeat timeout
565
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
566 567 568 569 570 571
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
572 573 574
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
575
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
576 577
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
578
      pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
579 580 581
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
582
    } else {
S
Shengliang Guan 已提交
583
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
584
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
585
             TMSG_INFO(pMsg->msgType));
586
      return -1;
587
    }
S
Shengliang Guan 已提交
588
  } else {
S
Shengliang Guan 已提交
589 590
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
591
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
592
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
593 594 595 596
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
597
    }
598

599 600 601 602 603
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
604
    }
M
Minghao Li 已提交
605

S
Shengliang Guan 已提交
606
    if (seq != NULL) *seq = seqNum;
607
    return code;
M
Minghao Li 已提交
608
  }
M
Minghao Li 已提交
609 610
}

S
Shengliang Guan 已提交
611
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
612 613 614 615 616
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
617
  pSyncTimer->timeStamp = taosGetTimestampMs();
618 619 620 621
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
622
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
623
  int32_t ret = 0;
S
Shengliang Guan 已提交
624
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
625
  if (syncIsInit()) {
626 627 628 629 630 631
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
632
    pSyncTimer->timeStamp = tsNow;
633 634

    pData->syncNodeRid = pSyncNode->rid;
635 636 637
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
638
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
639

640 641
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
642 643 644 645 646 647
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
648
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
649 650 651 652
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
653 654
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
655 656 657
  return ret;
}

658
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
659 660 661
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
662 663 664
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

665 666 667 668 669
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
S
Shengliang Guan 已提交
670
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
671 672 673 674 675 676 677
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
678
// open/close --------------
S
Shengliang Guan 已提交
679 680
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
681 682 683 684
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
685

M
Minghao Li 已提交
686 687 688 689
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
690
      goto _error;
M
Minghao Li 已提交
691
    }
692
  }
M
Minghao Li 已提交
693

694 695 696
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
S
Shengliang Guan 已提交
697
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
698

699
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
700
    // create a new raft config file
701
    sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
702 703 704 705 706 707 708 709 710 711
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
    pSyncNode->raftCfg.lastConfigIndex = SYNC_INDEX_INVALID;
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
    pSyncNode->raftCfg.configIndexCount = 1;
    pSyncNode->raftCfg.configIndexArr[0] = -1;

    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
712
      goto _error;
713
    }
714 715
  } else {
    // update syncCfg by raft_config.json
716 717
    if (syncReadCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
718
      goto _error;
719
    }
S
Shengliang Guan 已提交
720

721
    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
722
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
723 724 725
      pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
      if (syncWriteCfgFile(pSyncNode) != 0) {
        sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
S
Shengliang Guan 已提交
726 727
        goto _error;
      }
S
Shengliang Guan 已提交
728
    } else {
729 730
      sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
731
    }
M
Minghao Li 已提交
732 733
  }

M
Minghao Li 已提交
734
  // init by SSyncInfo
M
Minghao Li 已提交
735
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
736
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
737
  bool      updated = false;
S
Shengliang Guan 已提交
738
  sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
S
Shengliang Guan 已提交
739 740
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
741 742 743
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
      updated = true;
    }
744 745
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
746 747
  }

748 749 750 751 752 753 754 755
  if (updated) {
    sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
      goto _error;
    }
  }

M
Minghao Li 已提交
756
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
757
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
758 759 760
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
761

B
Benguang Zhao 已提交
762 763 764
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
765
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
766 767 768
    goto _error;
  }

M
Minghao Li 已提交
769
  // init internal
770
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
771
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
772
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
773
    goto _error;
774
  }
M
Minghao Li 已提交
775

M
Minghao Li 已提交
776
  // init peersNum, peers, peersId
777
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.replicaNum - 1;
S
Shengliang Guan 已提交
778
  int32_t j = 0;
779 780 781 782
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
783 784 785
      j++;
    }
  }
S
Shengliang Guan 已提交
786
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
787
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
788
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
789
      goto _error;
790
    }
M
Minghao Li 已提交
791
  }
M
Minghao Li 已提交
792

M
Minghao Li 已提交
793
  // init replicaNum, replicasId
794 795 796
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
797
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
798
      goto _error;
799
    }
M
Minghao Li 已提交
800 801
  }

M
Minghao Li 已提交
802
  // init raft algorithm
M
Minghao Li 已提交
803
  pSyncNode->pFsm = pSyncInfo->pFsm;
804
  pSyncInfo->pFsm = NULL;
805
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
M
Minghao Li 已提交
806 807
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
808
  // init life cycle outside
M
Minghao Li 已提交
809

M
Minghao Li 已提交
810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
834
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
835
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
836
  if (raftStoreOpen(pSyncNode) != 0) {
S
Shengliang Guan 已提交
837
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
838 839
    goto _error;
  }
M
Minghao Li 已提交
840

M
Minghao Li 已提交
841
  // init TLA+ candidate vars
M
Minghao Li 已提交
842
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
843
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
844
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
845 846
    goto _error;
  }
M
Minghao Li 已提交
847
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
848
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
849
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
850 851
    goto _error;
  }
M
Minghao Li 已提交
852

M
Minghao Li 已提交
853 854
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
855
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
856
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
857 858
    goto _error;
  }
M
Minghao Li 已提交
859
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
860
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
861
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
862 863
    goto _error;
  }
M
Minghao Li 已提交
864 865 866

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
867
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
868
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
869 870
    goto _error;
  }
871 872 873 874

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
875
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
876 877
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
878
      sNTrace(pSyncNode, "reset commit index by snapshot");
879 880 881
    }
  }
  pSyncNode->commitIndex = commitIndex;
882
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
M
Minghao Li 已提交
883

884
  // restore log store on need
885
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
886
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
887 888
    goto _error;
  }
889

M
Minghao Li 已提交
890 891
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
892 893
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
894

M
Minghao Li 已提交
895
  // init ping timer
M
Minghao Li 已提交
896
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
897
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
898 899
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
900
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
901
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
902

M
Minghao Li 已提交
903 904
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
905
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
906
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
907
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
908 909 910 911
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
912
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
913 914
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
915
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
916 917
  pSyncNode->heartbeatTimerCounter = 0;

918 919 920 921 922
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
923
  // tools
M
Minghao Li 已提交
924
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
925
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
926
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
927 928
    goto _error;
  }
M
Minghao Li 已提交
929

930 931
  // restore state
  pSyncNode->restoreFinish = false;
932

M
Minghao Li 已提交
933
  // snapshot senders
S
Shengliang Guan 已提交
934
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
935
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
936 937 938 939
    if (pSender == NULL) return NULL;

    pSyncNode->senders[i] = pSender;
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
M
Minghao Li 已提交
940 941 942
  }

  // snapshot receivers
943
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
944 945 946
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
          pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
947

M
Minghao Li 已提交
948 949 950
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
951
  // replication mgr
952 953 954 955
  if (syncNodeLogReplMgrInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
B
Benguang Zhao 已提交
956

M
Minghao Li 已提交
957
  // peer state
958 959 960 961
  if (syncNodePeerStateInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
M
Minghao Li 已提交
962

B
Benguang Zhao 已提交
963
  //
M
Minghao Li 已提交
964 965 966
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
967
  // start in syncNodeStart
M
Minghao Li 已提交
968
  // start raft
M
Minghao Li 已提交
969
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
970

M
Minghao Li 已提交
971 972
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
973
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
974 975
  pSyncNode->lastReplicateTime = timeNow;

976 977 978
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
979 980
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
981
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
982
    goto _error;
B
Benguang Zhao 已提交
983 984
  }

985
  pSyncNode->isStart = true;
986 987 988
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
989 990
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
991
  pSyncNode->tmrRoutineNum = 0;
992

993 994
  sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
M
Minghao Li 已提交
995
  return pSyncNode;
996 997 998

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
999 1000
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1001 1002 1003 1004
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1005 1006
}

M
Minghao Li 已提交
1007 1008
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1009 1010
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1011 1012 1013 1014 1015 1016
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1017
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1018 1019
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1020 1021 1022 1023

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1024 1025
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
S
Shengliang Guan 已提交
1026
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
1027 1028 1029
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1030

1031
  ASSERT(endIndex == lastVer + 1);
1032 1033
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
B
Benguang Zhao 已提交
1034

1035
  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
B
Benguang Zhao 已提交
1036 1037 1038 1039 1040 1041 1042 1043 1044
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
1045
    raftStoreNextTerm(pSyncNode);
B
Benguang Zhao 已提交
1046 1047 1048 1049 1050 1051 1052 1053 1054 1055
    syncNodeBecomeLeader(pSyncNode, "one replica start");

    // Raft 3.6.2 Committing entries from previous terms
    syncNodeAppendNoop(pSyncNode);
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1056 1057
  if (ret != 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1058
  }
1059
  return ret;
M
Minghao Li 已提交
1060 1061
}

B
Benguang Zhao 已提交
1062
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1063 1064 1065 1066 1067 1068 1069
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1070 1071 1072 1073
  if (ret < 0) {
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
1074

1075
  ret = syncNodeStartPingTimer(pSyncNode);
1076 1077 1078 1079
  if (ret < 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
B
Benguang Zhao 已提交
1080
  return ret;
M
Minghao Li 已提交
1081 1082
}

M
Minghao Li 已提交
1083
void syncNodePreClose(SSyncNode* pSyncNode) {
1084 1085 1086 1087
  ASSERT(pSyncNode != NULL);
  ASSERT(pSyncNode->pFsm != NULL);
  ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);

M
Minghao Li 已提交
1088 1089 1090 1091 1092
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
1093

1094 1095 1096
  // stop ping timer
  syncNodeStopPingTimer(pSyncNode);

1097 1098
  // clean rsp
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
1099 1100
}

1101 1102 1103
void syncNodePostClose(SSyncNode* pSyncNode) {
  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1104
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1105 1106 1107 1108 1109 1110 1111
    }

    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }
M
Minghao Li 已提交
1112 1113
}

1114
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1115

M
Minghao Li 已提交
1116
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1117
  if (pSyncNode == NULL) return;
1118
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1119

1120 1121
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);

1122 1123 1124
  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);
B
Benguang Zhao 已提交
1125
  syncNodeLogReplMgrDestroy(pSyncNode);
1126

M
Minghao Li 已提交
1127
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1128
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1129
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1130
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1131
  votesRespondDestory(pSyncNode->pVotesRespond);
1132
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1133
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1134
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1135
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1136
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1137
  logStoreDestory(pSyncNode->pLogStore);
1138
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1139 1140
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1141

S
Shengliang Guan 已提交
1142
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1143 1144
    if (pSyncNode->senders[i] != NULL) {
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
1145

1146 1147
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
        snapshotSenderStop(pSyncNode->senders[i], false);
1148 1149
      }

1150 1151
      snapshotSenderDestroy(pSyncNode->senders[i]);
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1152 1153 1154
    }
  }

M
Minghao Li 已提交
1155
  if (pSyncNode->pNewNodeReceiver != NULL) {
1156
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1157
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1158 1159
    }

1160
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1161 1162 1163 1164
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1165 1166 1167 1168
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

1169 1170
  raftStoreClose(pSyncNode);

1171
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1172 1173
}

1174
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
M
Minghao Li 已提交
1175

M
Minghao Li 已提交
1176 1177 1178
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1179 1180
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1181 1182 1183
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1184
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1185
  }
M
Minghao Li 已提交
1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1199
  if (syncIsInit()) {
1200
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1201

1202 1203 1204 1205 1206
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1207

M
Minghao Li 已提交
1208
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1209
                 &pSyncNode->pElectTimer);
1210

1211
  } else {
M
Minghao Li 已提交
1212
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1213
  }
M
Minghao Li 已提交
1214 1215 1216 1217 1218
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1219
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1220 1221
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1222

M
Minghao Li 已提交
1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

1233
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1234 1235
  int32_t electMS;

1236
  if (pSyncNode->raftCfg.isStandBy) {
M
Minghao Li 已提交
1237 1238 1239 1240
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
1241 1242

  (void)syncNodeRestartElectTimer(pSyncNode, electMS);
1243

S
Shengliang Guan 已提交
1244 1245
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1246 1247
}

M
Minghao Li 已提交
1248
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1249
  int32_t ret = 0;
S
Shengliang Guan 已提交
1250 1251
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1252 1253 1254
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1255
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1256
  }
1257

S
Shengliang Guan 已提交
1258
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1259 1260 1261
  return ret;
}

M
Minghao Li 已提交
1262
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1263
  int32_t ret = 0;
M
Minghao Li 已提交
1264

1265
#if 0
M
Minghao Li 已提交
1266
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1267 1268
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1269

S
Shengliang Guan 已提交
1270
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1271
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1272 1273 1274
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1275
  }
1276

M
Minghao Li 已提交
1277 1278 1279
  return ret;
}

M
Minghao Li 已提交
1280 1281
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1282 1283

#if 0
M
Minghao Li 已提交
1284 1285 1286
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1287
#endif
1288

S
Shengliang Guan 已提交
1289
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1290
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1291 1292 1293
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1294
  }
1295

M
Minghao Li 已提交
1296 1297 1298
  return ret;
}

1299 1300 1301 1302 1303 1304
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

1305 1306 1307 1308 1309 1310 1311 1312
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
  SEpSet* epSet = NULL;
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
    if (destRaftId->addr == pNode->peersId[i].addr) {
      epSet = &pNode->peersEpset[i];
      break;
    }
  }
1313

S
Shengliang Guan 已提交
1314
  int32_t code = -1;
1315
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
M
Minghao Li 已提交
1316
    syncUtilMsgHtoN(pMsg->pCont);
1317
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1318 1319 1320 1321 1322 1323
    code = pNode->syncSendMSg(epSet, pMsg);
  }

  if (code < 0) {
    sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet,
           DID(destRaftId), destRaftId->addr, terrno);
S
Shengliang Guan 已提交
1324
    rpcFreeCont(pMsg->pCont);
1325
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
1326
  }
S
Shengliang Guan 已提交
1327 1328

  return code;
M
Minghao Li 已提交
1329 1330
}

1331
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
1332 1333 1334
  bool b1 = false;
  bool b2 = false;

1335 1336 1337
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
1338 1339 1340 1341 1342
      b1 = true;
      break;
    }
  }

1343 1344 1345 1346 1347
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
        .vgId = pNode->vgId,
    };
1348

1349
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
1350 1351 1352 1353 1354
      b2 = true;
      break;
    }
  }

1355
  ASSERT(b1 == b2);
1356 1357 1358
  return b1;
}

1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1372
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1373
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1374 1375 1376 1377
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1378

1379 1380
  pSyncNode->raftCfg.cfg = *pNewConfig;
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
1381

1382 1383
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1384 1385
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1386

M
Minghao Li 已提交
1387 1388
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1389

M
Minghao Li 已提交
1390 1391 1392 1393
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1394
  }
1395

M
Minghao Li 已提交
1396 1397 1398 1399 1400
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1401

M
Minghao Li 已提交
1402
  // log begin config change
1403
  sNInfo(pSyncNode, "begin do config change, from %d to %d, replicas:%d", pSyncNode->vgId, oldConfig.replicaNum,
1404
         pNewConfig->replicaNum);
M
Minghao Li 已提交
1405

M
Minghao Li 已提交
1406
  if (IamInNew) {
1407
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1408
  }
M
Minghao Li 已提交
1409
  if (isDrop) {
1410
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
M
Minghao Li 已提交
1411 1412
  }

M
Minghao Li 已提交
1413
  // add last config index
1414
  syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
M
Minghao Li 已提交
1415

M
Minghao Li 已提交
1416 1417 1418 1419 1420 1421 1422 1423 1424
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1425
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1426
      oldSenders[i] = pSyncNode->senders[i];
S
Shengliang Guan 已提交
1427
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1428
    }
1429

M
Minghao Li 已提交
1430
    // init internal
1431
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
1432
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1433 1434

    // init peersNum, peers, peersId
1435
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1436
    int32_t j = 0;
1437 1438 1439 1440
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
1441 1442 1443
        j++;
      }
    }
S
Shengliang Guan 已提交
1444
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1445
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1446
    }
1447

M
Minghao Li 已提交
1448
    // init replicaNum, replicasId
1449 1450 1451
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
      syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1452
    }
1453

1454
    // update quorum first
1455
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
1456

M
Minghao Li 已提交
1457 1458 1459 1460
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1461

M
Minghao Li 已提交
1462
    // reset snapshot senders
1463

M
Minghao Li 已提交
1464
    // clear new
S
Shengliang Guan 已提交
1465
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1466
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1467
    }
M
Minghao Li 已提交
1468

M
Minghao Li 已提交
1469
    // reset new
S
Shengliang Guan 已提交
1470
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1471 1472
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1473
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1474
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
1475 1476
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
M
Minghao Li 已提交
1477

1478
          pSyncNode->senders[i] = oldSenders[j];
M
Minghao Li 已提交
1479 1480 1481 1482
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
1483 1484
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
          pSyncNode->senders[i]->replicaIndex = i;
M
Minghao Li 已提交
1485

1486 1487
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
M
Minghao Li 已提交
1488 1489

          break;
M
Minghao Li 已提交
1490
        }
1491 1492
      }
    }
1493

M
Minghao Li 已提交
1494
    // create new
S
Shengliang Guan 已提交
1495
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1496 1497 1498 1499 1500 1501 1502 1503
      if (pSyncNode->senders[i] == NULL) {
        pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
        if (pSyncNode->senders[i] == NULL) {
          // will be created later while send snapshot
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
        } else {
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
        }
S
Shengliang Guan 已提交
1504
      } else {
1505
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
M
Minghao Li 已提交
1506
      }
1507 1508
    }

M
Minghao Li 已提交
1509
    // free old
S
Shengliang Guan 已提交
1510
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1511
      if (oldSenders[i] != NULL) {
1512
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1513 1514 1515
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1516 1517
    }

1518
    // persist cfg
1519
    syncWriteCfgFile(pSyncNode);
1520

M
Minghao Li 已提交
1521 1522
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
1523
      syncNodeBecomeLeader(pSyncNode, "");
1524 1525 1526

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1527
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1528

M
Minghao Li 已提交
1529
    } else {
1530
      syncNodeBecomeFollower(pSyncNode, "");
M
Minghao Li 已提交
1531 1532
    }
  } else {
1533
    // persist cfg
1534 1535
    syncWriteCfgFile(pSyncNode);
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.replicaNum, pNewConfig->replicaNum);
1536
  }
1537

M
Minghao Li 已提交
1538
_END:
M
Minghao Li 已提交
1539
  // log end config change
S
Shengliang Guan 已提交
1540
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.replicaNum, pNewConfig->replicaNum);
M
Minghao Li 已提交
1541 1542
}

M
Minghao Li 已提交
1543 1544
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
1545
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1546
    raftStoreSetTerm(pSyncNode, term);
1547
    char tmpBuf[64];
1548
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1549
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1550
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1551 1552 1553
  }
}

1554
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
1555
  if (term > raftStoreGetTerm(pSyncNode)) {
S
Shengliang Guan 已提交
1556
    raftStoreSetTerm(pSyncNode, term);
1557 1558 1559
  }
}

M
Minghao Li 已提交
1560
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
1561 1562 1563
  SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
  if (currentTerm > newTerm) {
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1564 1565
    return;
  }
M
Minghao Li 已提交
1566 1567

  do {
1568
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
M
Minghao Li 已提交
1569 1570
  } while (0);

1571
  if (currentTerm < newTerm) {
S
Shengliang Guan 已提交
1572
    raftStoreSetTerm(pSyncNode, newTerm);
M
Minghao Li 已提交
1573
    char tmpBuf[64];
1574
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1575
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1576
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1577 1578 1579 1580 1581 1582 1583 1584

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1585 1586
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1587
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1588
  // maybe clear leader cache
M
Minghao Li 已提交
1589 1590 1591 1592
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1593 1594
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1595
  // state change
M
Minghao Li 已提交
1596 1597 1598
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

1599 1600
  // trace log
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1601

1602 1603 1604
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1605 1606 1607 1608 1609
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1610 1611 1612
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1613 1614 1615
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

1616 1617
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1638
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1639 1640
  pSyncNode->leaderTime = taosGetTimestampMs();

1641
  pSyncNode->becomeLeaderNum++;
1642
  pSyncNode->hbrSlowNum = 0;
1643

1644 1645 1646
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1647
  // state change
M
Minghao Li 已提交
1648
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1649 1650

  // set leader cache
M
Minghao Li 已提交
1651 1652
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1653
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
1654 1655 1656
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1657
    ASSERT(code == 0);
1658
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1659 1660
  }

S
Shengliang Guan 已提交
1661
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1662 1663
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1664 1665 1666
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1667 1668 1669
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1670
#if 0
1671 1672
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1673
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1674
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1675 1676
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
1677
      }
1678
    }
1679
    (pMySender->privateTerm) += 100;
1680
  }
M
Minghao Li 已提交
1681
#endif
1682

1683
  // close receiver
1684
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1685
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1686 1687
  }

M
Minghao Li 已提交
1688
  // stop elect timer
M
Minghao Li 已提交
1689
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1690

M
Minghao Li 已提交
1691 1692
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1693

M
Minghao Li 已提交
1694 1695
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1696

1697 1698 1699 1700 1701
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1702 1703 1704
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1705 1706 1707
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1708
  // trace log
1709
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1710 1711 1712
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1713
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1714 1715 1716 1717 1718
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
  if (!granted) {
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
    return;
  }
1719
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1720

S
Shengliang Guan 已提交
1721
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1722

B
Benguang Zhao 已提交
1723
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1724 1725 1726 1727
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1728
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1729
  ASSERT(lastIndex >= 0);
1730 1731
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
        raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1732 1733
}

M
Minghao Li 已提交
1734 1735
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1736
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1737
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1738 1739 1740 1741 1742
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1743 1744 1745
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1746
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1747
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1748
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1749
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1750
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1751

S
Shengliang Guan 已提交
1752
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1753 1754 1755
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1756
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1757
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1758
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1759
  sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1760
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1761

S
Shengliang Guan 已提交
1762
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1763 1764 1765
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1766
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1767
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1768
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1769
  sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
1770
        pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1771

S
Shengliang Guan 已提交
1772
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1773 1774
}

M
Minghao Li 已提交
1775 1776
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1777
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1778
  ASSERT(term == raftStoreGetTerm(pSyncNode));
1779 1780
  bool voted = raftStoreHasVoted(pSyncNode);
  ASSERT(!voted);
M
Minghao Li 已提交
1781

S
Shengliang Guan 已提交
1782
  raftStoreVote(pSyncNode, pRaftId);
M
Minghao Li 已提交
1783 1784
}

M
Minghao Li 已提交
1785
// simulate get vote from outside
1786 1787
void syncNodeVoteForSelf(SSyncNode* pSyncNode, SyncTerm currentTerm) {
  syncNodeVoteForTerm(pSyncNode, currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1788

S
Shengliang Guan 已提交
1789 1790
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1791
  if (ret != 0) return;
M
Minghao Li 已提交
1792

S
Shengliang Guan 已提交
1793
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1794 1795
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
1796
  pMsg->term = currentTerm;
M
Minghao Li 已提交
1797 1798 1799 1800
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1801
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1802 1803
}

M
Minghao Li 已提交
1804
// return if has a snapshot
M
Minghao Li 已提交
1805 1806
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1807
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1808 1809
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1810 1811 1812 1813 1814 1815 1816
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1817 1818
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1819
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1820
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1821 1822
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1823 1824 1825 1826 1827 1828 1829
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1830 1831
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1832 1833
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1834 1835
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1836
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1837 1838
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1839 1840
    }

M
Minghao Li 已提交
1841 1842 1843
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1844 1845 1846 1847
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1848
  } else {
M
Minghao Li 已提交
1849 1850
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1851
  }
M
Minghao Li 已提交
1852

M
Minghao Li 已提交
1853 1854 1855 1856 1857 1858 1859
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1860 1861
  return 0;
}
M
Minghao Li 已提交
1862

M
Minghao Li 已提交
1863
// return append-entries first try index
M
Minghao Li 已提交
1864 1865 1866 1867 1868
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1869 1870
// if index > 0, return index - 1
// else, return -1
1871 1872 1873 1874 1875 1876 1877 1878 1879
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1880 1881 1882 1883
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1884 1885 1886 1887 1888 1889 1890 1891 1892
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

1893 1894 1895
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

1896
  SSyncRaftEntry* pPreEntry = NULL;
1897 1898 1899 1900 1901 1902 1903
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

1904
    pSyncNode->pLogStore->cacheHit++;
1905 1906 1907
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
1908
    pSyncNode->pLogStore->cacheMiss++;
1909 1910 1911 1912
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
1913 1914 1915 1916 1917 1918

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

1919
  if (code == 0) {
1920
    ASSERT(pPreEntry != NULL);
1921
    preTerm = pPreEntry->term;
1922 1923 1924 1925

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
1926
      syncEntryDestroy(pPreEntry);
1927 1928
    }

1929 1930
    return preTerm;
  } else {
1931 1932 1933 1934
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
1935 1936 1937 1938
      }
    }
  }

1939
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
1940
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1941 1942
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
1943 1944 1945 1946

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1947
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1948 1949 1950
  return 0;
}

M
Minghao Li 已提交
1951
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1952
  if (!syncIsInit()) return;
M
Minghao Li 已提交
1953

S
Shengliang Guan 已提交
1954 1955 1956
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1957
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
1958 1959
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
1960
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
1961 1962
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
1963
    }
M
Minghao Li 已提交
1964

M
Minghao Li 已提交
1965
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
1966 1967
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
1968
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
1969 1970
      rpcFreeCont(rpcMsg.pCont);
      return;
1971
    }
M
Minghao Li 已提交
1972

S
Shengliang Guan 已提交
1973
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
1974
  }
M
Minghao Li 已提交
1975 1976
}

M
Minghao Li 已提交
1977
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1978
  if (!syncIsInit()) return;
M
Minghao Li 已提交
1979

M
Minghao Li 已提交
1980 1981
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
1982

1983
  if (pNode == NULL) return;
M
Minghao Li 已提交
1984 1985 1986 1987 1988

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
1989

1990
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
1991 1992 1993 1994
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
1995

S
Shengliang Guan 已提交
1996
  SRpcMsg rpcMsg = {0};
1997 1998
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
1999

S
Shengliang Guan 已提交
2000
  if (code != 0) {
M
Minghao Li 已提交
2001
    sError("failed to build elect msg");
M
Minghao Li 已提交
2002
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2003
    return;
M
Minghao Li 已提交
2004 2005
  }

S
Shengliang Guan 已提交
2006
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2007
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2008 2009 2010

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2011
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2012
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2013
    syncNodeRelease(pNode);
2014
    return;
M
Minghao Li 已提交
2015
  }
M
Minghao Li 已提交
2016 2017

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2018 2019
}

M
Minghao Li 已提交
2020
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2021
  if (!syncIsInit()) return;
2022

S
Shengliang Guan 已提交
2023 2024 2025 2026
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2027
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2028 2029 2030
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2031
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2032
        return;
2033
      }
M
Minghao Li 已提交
2034

2035
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2036 2037
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2038
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2039 2040
        rpcFreeCont(rpcMsg.pCont);
        return;
2041
      }
S
Shengliang Guan 已提交
2042 2043 2044 2045

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2046
    } else {
S
Shengliang Guan 已提交
2047 2048
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2049
    }
M
Minghao Li 已提交
2050 2051 2052
  }
}

2053
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2054
  int64_t hbDataRid = (int64_t)param;
2055
  int64_t tsNow = taosGetTimestampMs();
2056

2057 2058
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
M
Minghao Li 已提交
2059
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2060 2061
    return;
  }
2062

2063
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2064
  if (pSyncNode == NULL) {
2065
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2066
    sError("hb timer get pSyncNode NULL");
2067 2068 2069 2070 2071 2072 2073 2074
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2075
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2076 2077 2078
    return;
  }

M
Minghao Li 已提交
2079
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2080 2081
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2082
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2083 2084 2085
    return;
  }

M
Minghao Li 已提交
2086
  // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
2087 2088

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2089 2090 2091
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2092
    if (timerLogicClock == msgLogicClock) {
2093 2094 2095 2096 2097 2098
      if (tsNow > pData->execTime) {
        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

2099 2100
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

2101 2102 2103
        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
2104
        pSyncMsg->term = raftStoreGetTerm(pSyncNode);
2105
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
2106
        pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
2107
        pSyncMsg->privateTerm = 0;
2108
        pSyncMsg->timeStamp = tsNow;
2109 2110 2111 2112 2113 2114

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
2115 2116
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2117 2118 2119
      } else {
      }

M
Minghao Li 已提交
2120 2121
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2122 2123
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2124 2125 2126 2127
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2128
    } else {
M
Minghao Li 已提交
2129 2130
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2131 2132
    }
  }
2133 2134 2135

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2136 2137
}

2138 2139
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2140 2141 2142 2143
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2144 2145
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2146 2147 2148 2149 2150 2151 2152 2153 2154
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2155
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
2156 2157 2158 2159 2160 2161 2162
  if (pEntry->dataLen < sizeof(SMsgHead)) {
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
    syncEntryDestroy(pEntry);
    return -1;
  }

B
Benguang Zhao 已提交
2163 2164
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2165
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
2166 2167
    ASSERT(terrno != 0);
    (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
2168
    syncEntryDestroy(pEntry);
B
Benguang Zhao 已提交
2169 2170 2171 2172
    return -1;
  }

  // proceed match index, with replicating on needed
2173
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2174

S
Shengliang Guan 已提交
2175
  sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2176 2177 2178
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2179

B
Benguang Zhao 已提交
2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2196
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208
  if (pSyncNode->replicaNum == 1) {
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2209
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2210 2211 2212 2213 2214 2215 2216 2217 2218
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2238
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2239
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
2240
  SyncTerm  term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2241 2242 2243 2244 2245 2246 2247

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2248 2249
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2250 2251 2252
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2253 2254
  int32_t ret = 0;

2255
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
2256
  SyncTerm        term = raftStoreGetTerm(ths);
M
Minghao Li 已提交
2257
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2258
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2259

2260 2261
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2262
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2263
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
2264
    if (code != 0) {
M
Minghao Li 已提交
2265
      sError("append noop error");
2266 2267
      return -1;
    }
2268 2269

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2270 2271
  }

2272 2273 2274
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2275
    syncEntryDestroy(pEntry);
2276 2277
  }

M
Minghao Li 已提交
2278 2279 2280
  return ret;
}

S
Shengliang Guan 已提交
2281 2282
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2283
  bool           resetElect = false;
2284

M
Minghao Li 已提交
2285 2286 2287 2288
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2289
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2290
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2291
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2292

2293 2294
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
2295
  SyncTerm currentTerm = raftStoreGetTerm(ths);
2296 2297

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2298 2299
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
2300
  pMsgReply->term = currentTerm;
2301
  pMsgReply->privateTerm = 8864;  // magic number
2302
  pMsgReply->startTime = ths->startTime;
2303
  pMsgReply->timeStamp = tsMs;
2304

2305
  if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2306
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
2307
    resetElect = true;
2308

M
Minghao Li 已提交
2309
    ths->minMatchIndex = pMsg->minMatchIndex;
2310 2311

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2312 2313 2314 2315
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2316
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
2317 2318 2319
      pSyncMsg->commitIndex = pMsg->commitIndex;
      pSyncMsg->currentTerm = pMsg->term;
      SyncIndex fcIndex = pSyncMsg->commitIndex;
2320 2321 2322 2323 2324 2325 2326

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2327
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2328 2329
        }
      }
2330 2331 2332
    }
  }

2333
  if (pMsg->term >= currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2334 2335 2336 2337
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2338
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
2339 2340
    pSyncMsg->currentTerm = pMsg->term;
    pSyncMsg->commitIndex = pMsg->commitIndex;
2341

S
Shengliang Guan 已提交
2342 2343
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2344 2345 2346 2347
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
S
Shengliang Guan 已提交
2348
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pSyncMsg->currentTerm);
2349
      }
2350 2351 2352 2353 2354
    }
  }

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
2355 2356

  if (resetElect) syncNodeResetElectTimer(ths);
2357 2358 2359
  return 0;
}

2360
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2361 2362 2363 2364
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2365
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2366
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2367 2368 2369 2370
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2371 2372

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2373
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2374

2375 2376
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2377 2378 2379
  return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg);
}

2380
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2381
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2382

M
Minghao Li 已提交
2383 2384 2385 2386
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2387
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2388
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2389
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2390

2391
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2392
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2393 2394 2395
  return 0;
}

S
Shengliang Guan 已提交
2396 2397
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2398 2399
  syncLogRecvLocalCmd(ths, pMsg, "");

2400
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
2401
    syncNodeStepDown(ths, pMsg->currentTerm);
2402 2403

  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
2404 2405 2406 2407
    if (syncLogBufferIsEmpty(ths->pLogBuf)) {
      sError("vgId:%d, sync log buffer is empty.", ths->vgId);
      return 0;
    }
2408 2409 2410 2411
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
    if (pMsg->currentTerm == matchTerm) {
      (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
    }
2412
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
S
Shengliang Guan 已提交
2413
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
2414 2415 2416 2417 2418 2419 2420 2421 2422
             ths->commitIndex);
    }
  } else {
    sError("error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
2423 2424 2425 2426 2427 2428 2429 2430 2431 2432
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2433

2434
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2435
  sNTrace(ths, "on client request");
2436

B
Benguang Zhao 已提交
2437 2438
  int32_t code = 0;

B
Benguang Zhao 已提交
2439
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
2440
  SyncTerm        term = raftStoreGetTerm(ths);
B
Benguang Zhao 已提交
2441
  SSyncRaftEntry* pEntry = NULL;
2442 2443 2444 2445
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2446 2447
  }

2448 2449 2450 2451 2452
  if (pEntry == NULL) {
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
    return -1;
  }

B
Benguang Zhao 已提交
2453 2454 2455 2456 2457
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2458 2459
    int32_t code = syncNodeAppend(ths, pEntry);
    return code;
2460 2461 2462
  } else {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
2463
    return -1;
B
Benguang Zhao 已提交
2464 2465 2466
  }
}

S
Shengliang Guan 已提交
2467 2468 2469
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2470
      return "follower";
S
Shengliang Guan 已提交
2471
    case TAOS_SYNC_STATE_CANDIDATE:
2472
      return "candidate";
S
Shengliang Guan 已提交
2473
    case TAOS_SYNC_STATE_LEADER:
2474
      return "leader";
S
Shengliang Guan 已提交
2475
    case TAOS_SYNC_STATE_ERROR:
2476
      return "error";
S
Shengliang Guan 已提交
2477 2478 2479 2480
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
    default:
      return "unknown";
S
Shengliang Guan 已提交
2481
  }
M
Minghao Li 已提交
2482
}
2483

2484
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2485
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2486 2487 2488 2489
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
        .vgId = ths->vgId,
    };
2490 2491 2492 2493 2494 2495 2496 2497 2498 2499

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2500 2501 2502 2503
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

2504
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2505
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2506 2507 2508 2509 2510
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2511 2512 2513 2514
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2515
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2516 2517 2518 2519 2520
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2521
}
M
Minghao Li 已提交
2522

2523 2524
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2525
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2526 2527 2528 2529 2530 2531 2532
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2533 2534
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2535
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2536 2537 2538 2539 2540 2541 2542 2543 2544
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2545
  if (pState == NULL) {
2546
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2547 2548
    return false;
  }
M
Minghao Li 已提交
2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2574
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2575
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2576
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2577 2578 2579 2580 2581 2582
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2583
}