syncMain.c 90.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
M
Minghao Li 已提交
40

M
Minghao Li 已提交
41 42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
48 49 50
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58 59 60 61
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
64
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
65
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
66
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
67 68
    return -1;
  }
M
Minghao Li 已提交
69

S
Shengliang Guan 已提交
70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
71
  if (pSyncNode->rid < 0) {
72
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
73 74 75
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79 80 81
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
82
  return pSyncNode->rid;
M
Minghao Li 已提交
83
}
M
Minghao Li 已提交
84

B
Benguang Zhao 已提交
85
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
86
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
87
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
88
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
B
Benguang Zhao 已提交
89 90 91 92
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
93
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
94
    goto _err;
M
Minghao Li 已提交
95
  }
M
Minghao Li 已提交
96

B
Benguang Zhao 已提交
97 98 99 100
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
101

B
Benguang Zhao 已提交
102 103
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
104

105 106 107
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
108 109
}

M
Minghao Li 已提交
110
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
111
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
112
  if (pSyncNode != NULL) {
113
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
114
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
115
    syncNodeRemove(rid);
M
Minghao Li 已提交
116 117 118
  }
}

M
Minghao Li 已提交
119 120
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
121 122 123
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
124 125 126
  }
}

127 128 129 130 131 132 133 134
void syncPostStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodePostClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
}

S
Shengliang Guan 已提交
135 136 137
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
138 139
}

S
Shengliang Guan 已提交
140
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
141
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
142
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
143

M
Minghao Li 已提交
144
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
145
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
146
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
147
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
148
    return -1;
M
Minghao Li 已提交
149
  }
150

S
Shengliang Guan 已提交
151 152
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
153

M
Minghao Li 已提交
154 155 156 157
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
158
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
159 160 161
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
162
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
163
  }
S
Shengliang Guan 已提交
164

S
Shengliang Guan 已提交
165
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
166
  return 0;
M
Minghao Li 已提交
167
}
M
Minghao Li 已提交
168

S
Shengliang Guan 已提交
169 170 171 172
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
173
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
174 175
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
205
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
206 207 208 209 210
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
    default:
211
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
212
      code = -1;
M
Minghao Li 已提交
213 214
  }

S
Shengliang Guan 已提交
215
  syncNodeRelease(pSyncNode);
216 217 218 219
  if (code != 0) {
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
           terrno);
  }
S
Shengliang Guan 已提交
220
  return code;
221 222
}

S
Shengliang Guan 已提交
223
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
224
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
225
  if (pSyncNode == NULL) return -1;
226

S
Shengliang Guan 已提交
227
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
228
  syncNodeRelease(pSyncNode);
229 230 231
  return ret;
}

232
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
233
  SSyncNode* pNode = syncNodeAcquire(rid);
234
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
235 236

  SRpcMsg rpcMsg = {0};
237
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
238 239 240
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
241
  if (ret == 1) {
242
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
243
    rpcSendResponse(&rpcMsg);
244 245
    return 0;
  } else {
246
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
247
    return -1;
248
  }
S
Shengliang Guan 已提交
249 250
}

M
Minghao Li 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

267
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
268
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
269
  if (pSyncNode == NULL) {
270
    sError("sync begin snapshot error");
271 272
    return -1;
  }
273

274 275
  int32_t code = 0;

M
Minghao Li 已提交
276
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
277 278 279
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
280 281 282
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
283 284 285
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
286 287
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
288
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
289 290 291
      return 0;
    }

M
Minghao Li 已提交
292 293 294
    goto _DEL_WAL;

  } else {
295 296 297 298 299 300 301 302 303 304
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
      syncNodeRelease(pSyncNode);
      return 0;
    }

M
Minghao Li 已提交
305 306 307 308
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

309
      lastApplyIndex = TMAX(lastApplyIndex - SYNC_VNODE_LOG_RETENTION, beginIndex - 1);
310

M
Minghao Li 已提交
311 312 313 314 315 316
      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
317 318 319 320
            sNTrace(pSyncNode,
                    "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                    " of dnode:%d, do not delete wal",
                    lastApplyIndex, matchIndex, DID(&pSyncNode->peersId[i]));
M
Minghao Li 已提交
321

S
Shengliang Guan 已提交
322
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
323 324 325 326 327 328
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
329 330 331
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
332
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
333 334 335 336
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
337
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
338
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
339 340 341
        return 0;

      } else {
S
Shengliang Guan 已提交
342
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
343
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
344 345 346 347 348 349 350 351 352
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
353 354 355
    }
  }

M
Minghao Li 已提交
356
_DEL_WAL:
357

M
Minghao Li 已提交
358
  do {
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

        code = walBeginSnapshot(pData->pWal, lastApplyIndex);
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
379

M
Minghao Li 已提交
380
      } else {
381 382
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
383
      }
384
    }
M
Minghao Li 已提交
385
  } while (0);
386

S
Shengliang Guan 已提交
387
  syncNodeRelease(pSyncNode);
388 389 390 391
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
392
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
393
  if (pSyncNode == NULL) {
394
    sError("sync end snapshot error");
395 396 397
    return -1;
  }

398 399 400 401
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
402
    if (code != 0) {
403
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
404
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
405 406
      return -1;
    } else {
S
Shengliang Guan 已提交
407
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
408 409
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
410
  }
411

S
Shengliang Guan 已提交
412
  syncNodeRelease(pSyncNode);
413 414 415
  return code;
}

M
Minghao Li 已提交
416
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
417
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
418
  if (pSyncNode == NULL) {
419
    sError("sync step down error");
M
Minghao Li 已提交
420 421 422
    return -1;
  }

M
Minghao Li 已提交
423
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
424
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
425
  return 0;
M
Minghao Li 已提交
426 427
}

428
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
429
  if (pSyncNode == NULL) {
430
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
431
    sError("sync ready for read error");
432 433
    return false;
  }
M
Minghao Li 已提交
434

435 436 437 438 439 440
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

  if (pSyncNode->restoreFinish) {
441
    return true;
M
Minghao Li 已提交
442 443
  }

444
  bool ready = false;
445 446 447
  if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
    // apply queue not empty
    ready = false;
M
Minghao Li 已提交
448

449 450 451 452 453 454 455 456 457 458 459 460 461
  } else {
    if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
      SyncIndex       lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
      SSyncRaftEntry* pEntry = NULL;
      SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
      LRUHandle*      h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
      int32_t         code = 0;
      if (h) {
        pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        code = 0;

        pSyncNode->pLogStore->cacheHit++;
        sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
M
Minghao Li 已提交
462

463 464 465
      } else {
        pSyncNode->pLogStore->cacheMiss++;
        sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
M
Minghao Li 已提交
466

467 468
        code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
      }
469

470
      if (code == 0 && pEntry != NULL) {
S
Shengliang Guan 已提交
471
        if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->raftStore.currentTerm) {
472
          ready = true;
473
        }
474

475 476 477 478
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestroy(pEntry);
479
        }
480 481 482 483
      }
    }
  }

484
  if (!ready) {
485
    terrno = TSDB_CODE_SYN_RESTORING;
486
  }
487

488 489 490 491 492 493 494 495 496 497 498 499
  return ready;
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

500 501
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
502
}
M
Minghao Li 已提交
503

504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
526 527
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
528
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
529 530
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
531
  }
M
Minghao Li 已提交
532

533
  int32_t ret = 0;
534
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
535
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
536 537 538 539 540 541 542
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
543 544 545
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
546
  return ret;
M
Minghao Li 已提交
547 548
}

M
Minghao Li 已提交
549 550
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
551
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
552 553
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
554
  }
555

S
Shengliang Guan 已提交
556
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
557

558 559 560 561
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
562
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
M
Minghao Li 已提交
563 564 565
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
566
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
567 568
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
569 570
}

571 572
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
573

S
Shengliang Guan 已提交
574
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
575 576 577
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
578 579 580 581 582
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
583
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
584 585
  }

586
  return state;
M
Minghao Li 已提交
587 588
}

589
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
590 591
  ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
592

593 594 595 596
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
597 598
    }
  }
S
Shengliang Guan 已提交
599
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
600
         snapshotLastApplyIndex, lastIndex);
601 602 603 604

  return lastIndex;
}

605 606
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
607

S
Shengliang Guan 已提交
608
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
609
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
610

611
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
612
    SEp* pEp = &pEpSet->eps[i];
613 614
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
S
Shengliang Guan 已提交
615
    pEpSet->numOfEps++;
616
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
617
  }
M
Minghao Li 已提交
618
  if (pEpSet->numOfEps > 0) {
619
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
620 621
  }

S
Shengliang Guan 已提交
622
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
623
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
624 625
}

S
Shengliang Guan 已提交
626
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
627
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
628
  if (pSyncNode == NULL) {
629
    sError("sync propose error");
M
Minghao Li 已提交
630
    return -1;
631
  }
632

S
Shengliang Guan 已提交
633
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
634
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
635 636
  return ret;
}
M
Minghao Li 已提交
637

S
Shengliang Guan 已提交
638
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
639 640
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
641
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
642 643
    return -1;
  }
644

S
Shengliang Guan 已提交
645 646 647 648 649 650 651
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
652

653
  // heartbeat timeout
654
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
655 656 657 658 659 660
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
661 662 663
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
664
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
665 666
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
S
Shengliang Guan 已提交
667
      pMsg->info.conn.applyTerm = pSyncNode->raftStore.currentTerm;
668 669 670
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
671
    } else {
S
Shengliang Guan 已提交
672
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
673
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
674
             TMSG_INFO(pMsg->msgType));
675
      return -1;
676
    }
S
Shengliang Guan 已提交
677
  } else {
S
Shengliang Guan 已提交
678 679
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
680
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
681
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
682 683 684 685
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
686
    }
687

688 689 690 691 692
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
693
    }
M
Minghao Li 已提交
694

S
Shengliang Guan 已提交
695
    if (seq != NULL) *seq = seqNum;
696
    return code;
M
Minghao Li 已提交
697
  }
M
Minghao Li 已提交
698 699
}

S
Shengliang Guan 已提交
700
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
701 702 703 704 705
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
706
  pSyncTimer->timeStamp = taosGetTimestampMs();
707 708 709 710
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
711
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
712
  int32_t ret = 0;
S
Shengliang Guan 已提交
713
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
714
  if (syncIsInit()) {
715 716 717 718 719 720
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
721
    pSyncTimer->timeStamp = tsNow;
722 723

    pData->syncNodeRid = pSyncNode->rid;
724 725 726
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
727
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
728

729 730
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
731 732 733 734 735 736
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
737
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
738 739 740 741
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
742 743
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
744 745 746
  return ret;
}

747
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
748 749 750
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
751 752 753
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

754 755 756 757 758
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
S
Shengliang Guan 已提交
759
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
760 761 762 763 764 765 766
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
767
// open/close --------------
S
Shengliang Guan 已提交
768 769
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
770 771 772 773
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
774

M
Minghao Li 已提交
775 776 777 778
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
779
      goto _error;
M
Minghao Li 已提交
780
    }
781
  }
M
Minghao Li 已提交
782

783 784 785
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
S
Shengliang Guan 已提交
786
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
787

788
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
789
    // create a new raft config file
790
    sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
791 792 793 794 795 796 797 798 799 800
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
    pSyncNode->raftCfg.lastConfigIndex = SYNC_INDEX_INVALID;
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
    pSyncNode->raftCfg.configIndexCount = 1;
    pSyncNode->raftCfg.configIndexArr[0] = -1;

    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
801
      goto _error;
802
    }
803 804
  } else {
    // update syncCfg by raft_config.json
805 806
    if (syncReadCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
807
      goto _error;
808
    }
S
Shengliang Guan 已提交
809

810
    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
811
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
812 813 814
      pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
      if (syncWriteCfgFile(pSyncNode) != 0) {
        sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
S
Shengliang Guan 已提交
815 816
        goto _error;
      }
S
Shengliang Guan 已提交
817
    } else {
818 819
      sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
820
    }
M
Minghao Li 已提交
821 822
  }

M
Minghao Li 已提交
823
  // init by SSyncInfo
M
Minghao Li 已提交
824
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
825
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
826
  bool      updated = false;
S
Shengliang Guan 已提交
827
  sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
S
Shengliang Guan 已提交
828 829
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
830 831 832
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
      updated = true;
    }
833 834
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
835 836
  }

837 838 839 840 841 842 843 844
  if (updated) {
    sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
      goto _error;
    }
  }

M
Minghao Li 已提交
845
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
846
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
847 848 849
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
850

B
Benguang Zhao 已提交
851 852 853
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
854
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
855 856 857
    goto _error;
  }

M
Minghao Li 已提交
858
  // init internal
859
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
860
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
861
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
862
    goto _error;
863
  }
M
Minghao Li 已提交
864

M
Minghao Li 已提交
865
  // init peersNum, peers, peersId
866
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.replicaNum - 1;
S
Shengliang Guan 已提交
867
  int32_t j = 0;
868 869 870 871
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
872 873 874
      j++;
    }
  }
S
Shengliang Guan 已提交
875
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
876
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
877
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
878
      goto _error;
879
    }
M
Minghao Li 已提交
880
  }
M
Minghao Li 已提交
881

M
Minghao Li 已提交
882
  // init replicaNum, replicasId
883 884 885
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
886
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
887
      goto _error;
888
    }
M
Minghao Li 已提交
889 890
  }

M
Minghao Li 已提交
891
  // init raft algorithm
M
Minghao Li 已提交
892
  pSyncNode->pFsm = pSyncInfo->pFsm;
893
  pSyncInfo->pFsm = NULL;
894
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
M
Minghao Li 已提交
895 896
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
897
  // init life cycle outside
M
Minghao Li 已提交
898

M
Minghao Li 已提交
899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
923
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
924
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
S
Shengliang Guan 已提交
925
  if (raftStoreReadFile(pSyncNode) != 0) {
S
Shengliang Guan 已提交
926
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
927 928
    goto _error;
  }
M
Minghao Li 已提交
929

M
Minghao Li 已提交
930
  // init TLA+ candidate vars
M
Minghao Li 已提交
931
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
932
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
933
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
934 935
    goto _error;
  }
M
Minghao Li 已提交
936
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
937
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
938
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
939 940
    goto _error;
  }
M
Minghao Li 已提交
941

M
Minghao Li 已提交
942 943
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
944
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
945
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
946 947
    goto _error;
  }
M
Minghao Li 已提交
948
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
949
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
950
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
951 952
    goto _error;
  }
M
Minghao Li 已提交
953 954 955

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
956
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
957
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
958 959
    goto _error;
  }
960 961 962 963

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
964
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
965 966
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
967
      sNTrace(pSyncNode, "reset commit index by snapshot");
968 969 970
    }
  }
  pSyncNode->commitIndex = commitIndex;
971
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
M
Minghao Li 已提交
972

973
  // restore log store on need
974
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
975
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
976 977
    goto _error;
  }
978

M
Minghao Li 已提交
979 980
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
981 982
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
983

M
Minghao Li 已提交
984
  // init ping timer
M
Minghao Li 已提交
985
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
986
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
987 988
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
989
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
990
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
991

M
Minghao Li 已提交
992 993
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
994
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
995
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
996
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
997 998 999 1000
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
1001
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
1002 1003
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
1004
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
1005 1006
  pSyncNode->heartbeatTimerCounter = 0;

1007 1008 1009 1010 1011
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
1012
  // tools
M
Minghao Li 已提交
1013
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
1014
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
1015
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
1016 1017
    goto _error;
  }
M
Minghao Li 已提交
1018

1019 1020
  // restore state
  pSyncNode->restoreFinish = false;
1021

M
Minghao Li 已提交
1022
  // snapshot senders
S
Shengliang Guan 已提交
1023
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1024
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
1025 1026 1027 1028
    if (pSender == NULL) return NULL;

    pSyncNode->senders[i] = pSender;
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
M
Minghao Li 已提交
1029 1030 1031
  }

  // snapshot receivers
1032
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
1033 1034 1035
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
          pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1036

M
Minghao Li 已提交
1037 1038 1039
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
1040
  // replication mgr
1041 1042 1043 1044
  if (syncNodeLogReplMgrInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
B
Benguang Zhao 已提交
1045

M
Minghao Li 已提交
1046
  // peer state
1047 1048 1049 1050
  if (syncNodePeerStateInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
M
Minghao Li 已提交
1051

B
Benguang Zhao 已提交
1052
  //
M
Minghao Li 已提交
1053 1054 1055
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1056
  // start in syncNodeStart
M
Minghao Li 已提交
1057
  // start raft
M
Minghao Li 已提交
1058
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1059

M
Minghao Li 已提交
1060 1061
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1062
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1063 1064
  pSyncNode->lastReplicateTime = timeNow;

1065 1066 1067
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
1068 1069
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
1070
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1071
    goto _error;
B
Benguang Zhao 已提交
1072 1073
  }

1074
  pSyncNode->isStart = true;
1075 1076 1077
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1078 1079
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1080
  pSyncNode->tmrRoutineNum = 0;
1081

1082 1083
  sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
M
Minghao Li 已提交
1084
  return pSyncNode;
1085 1086 1087

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1088 1089
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1090 1091 1092 1093
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1094 1095
}

M
Minghao Li 已提交
1096 1097
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1098 1099
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1100 1101 1102 1103 1104 1105
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1106
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1107 1108
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1109 1110 1111 1112

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1113 1114
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
S
Shengliang Guan 已提交
1115
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
1116 1117 1118
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1119

1120
  ASSERT(endIndex == lastVer + 1);
1121 1122
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
B
Benguang Zhao 已提交
1123

1124
  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
B
Benguang Zhao 已提交
1125 1126 1127 1128 1129 1130 1131 1132 1133
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
1134
    raftStoreNextTerm(pSyncNode);
B
Benguang Zhao 已提交
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144
    syncNodeBecomeLeader(pSyncNode, "one replica start");

    // Raft 3.6.2 Committing entries from previous terms
    syncNodeAppendNoop(pSyncNode);
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1145 1146
  if (ret != 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1147
  }
1148
  return ret;
M
Minghao Li 已提交
1149 1150
}

B
Benguang Zhao 已提交
1151
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1152 1153 1154 1155 1156 1157 1158
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1159 1160 1161 1162
  if (ret < 0) {
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
1163

1164
  ret = syncNodeStartPingTimer(pSyncNode);
1165 1166 1167 1168
  if (ret < 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
B
Benguang Zhao 已提交
1169
  return ret;
M
Minghao Li 已提交
1170 1171
}

M
Minghao Li 已提交
1172
void syncNodePreClose(SSyncNode* pSyncNode) {
1173 1174 1175 1176
  if (pSyncNode != NULL && pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpApplyQueueItems != NULL) {
    while (1) {
      int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
      sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
1177
      if (aqItems == 0 || aqItems == -1) {
1178 1179 1180 1181 1182 1183
        break;
      }
      taosMsleep(20);
    }
  }

1184
#if 0
1185 1186
  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1187
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1188 1189
    }

1190 1191
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
1192 1193 1194
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }
1195
#endif
1196

M
Minghao Li 已提交
1197 1198 1199 1200 1201
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
1202 1203 1204

  // clean rsp
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
1205 1206
}

1207 1208 1209
void syncNodePostClose(SSyncNode* pSyncNode) {
  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1210
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1211 1212 1213 1214 1215 1216 1217
    }

    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }
M
Minghao Li 已提交
1218 1219
}

1220
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1221

M
Minghao Li 已提交
1222
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1223
  if (pSyncNode == NULL) return;
1224
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1225

B
Benguang Zhao 已提交
1226
  syncNodeLogReplMgrDestroy(pSyncNode);
M
Minghao Li 已提交
1227
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1228
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1229
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1230
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1231
  votesRespondDestory(pSyncNode->pVotesRespond);
1232
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1233
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1234
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1235
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1236
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1237
  logStoreDestory(pSyncNode->pLogStore);
1238
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1239 1240
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1241 1242 1243 1244 1245

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

S
Shengliang Guan 已提交
1246
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1247 1248
    if (pSyncNode->senders[i] != NULL) {
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
1249

1250 1251
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
        snapshotSenderStop(pSyncNode->senders[i], false);
1252 1253
      }

1254 1255
      snapshotSenderDestroy(pSyncNode->senders[i]);
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1256 1257 1258
    }
  }

M
Minghao Li 已提交
1259
  if (pSyncNode->pNewNodeReceiver != NULL) {
1260
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1261
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1262 1263
    }

1264
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1265 1266 1267 1268
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1269 1270 1271 1272
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

1273
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1274 1275
}

1276
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
M
Minghao Li 已提交
1277

M
Minghao Li 已提交
1278 1279 1280
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1281 1282
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1283 1284 1285
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1286
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1287
  }
M
Minghao Li 已提交
1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1301
  if (syncIsInit()) {
1302
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1303

1304 1305 1306 1307 1308
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1309

M
Minghao Li 已提交
1310
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1311
                 &pSyncNode->pElectTimer);
1312

1313
  } else {
M
Minghao Li 已提交
1314
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1315
  }
M
Minghao Li 已提交
1316 1317 1318 1319 1320
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1321
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1322 1323
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1324

M
Minghao Li 已提交
1325 1326 1327 1328 1329 1330 1331 1332 1333 1334
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

1335
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1336 1337
  int32_t electMS;

1338
  if (pSyncNode->raftCfg.isStandBy) {
M
Minghao Li 已提交
1339 1340 1341 1342
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
1343 1344

  (void)syncNodeRestartElectTimer(pSyncNode, electMS);
1345

S
Shengliang Guan 已提交
1346 1347
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1348 1349
}

M
Minghao Li 已提交
1350
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1351
  int32_t ret = 0;
S
Shengliang Guan 已提交
1352 1353
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1354 1355 1356
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1357
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1358
  }
1359

S
Shengliang Guan 已提交
1360
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1361 1362 1363
  return ret;
}

M
Minghao Li 已提交
1364
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1365
  int32_t ret = 0;
M
Minghao Li 已提交
1366

1367
#if 0
M
Minghao Li 已提交
1368
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1369 1370
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1371

S
Shengliang Guan 已提交
1372
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1373
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1374 1375 1376
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1377
  }
1378

M
Minghao Li 已提交
1379 1380 1381
  return ret;
}

M
Minghao Li 已提交
1382 1383
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1384 1385

#if 0
M
Minghao Li 已提交
1386 1387 1388
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1389
#endif
1390

S
Shengliang Guan 已提交
1391
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1392
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1393 1394 1395
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1396
  }
1397

M
Minghao Li 已提交
1398 1399 1400
  return ret;
}

1401 1402 1403 1404 1405 1406
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

1407 1408 1409 1410 1411 1412 1413 1414
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
  SEpSet* epSet = NULL;
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
    if (destRaftId->addr == pNode->peersId[i].addr) {
      epSet = &pNode->peersEpset[i];
      break;
    }
  }
1415

S
Shengliang Guan 已提交
1416
  int32_t code = -1;
1417
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
M
Minghao Li 已提交
1418
    syncUtilMsgHtoN(pMsg->pCont);
1419
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1420 1421 1422 1423 1424 1425
    code = pNode->syncSendMSg(epSet, pMsg);
  }

  if (code < 0) {
    sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet,
           DID(destRaftId), destRaftId->addr, terrno);
S
Shengliang Guan 已提交
1426
    rpcFreeCont(pMsg->pCont);
1427
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
1428
  }
S
Shengliang Guan 已提交
1429 1430

  return code;
M
Minghao Li 已提交
1431 1432
}

1433
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
1434 1435 1436
  bool b1 = false;
  bool b2 = false;

1437 1438 1439
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
1440 1441 1442 1443 1444
      b1 = true;
      break;
    }
  }

1445 1446 1447 1448 1449
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
        .vgId = pNode->vgId,
    };
1450

1451
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
1452 1453 1454 1455 1456
      b2 = true;
      break;
    }
  }

1457
  ASSERT(b1 == b2);
1458 1459 1460
  return b1;
}

1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1474
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1475
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1476 1477 1478 1479
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1480

1481 1482
  pSyncNode->raftCfg.cfg = *pNewConfig;
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
1483

1484 1485
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1486 1487
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1488

M
Minghao Li 已提交
1489 1490
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1491

M
Minghao Li 已提交
1492 1493 1494 1495
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1496
  }
1497

M
Minghao Li 已提交
1498 1499 1500 1501 1502
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1503

M
Minghao Li 已提交
1504
  // log begin config change
1505 1506
  sNInfo(pSyncNode, "begin do config change, from %d to %d", pSyncNode->vgId, oldConfig.replicaNum,
         pNewConfig->replicaNum);
M
Minghao Li 已提交
1507

M
Minghao Li 已提交
1508
  if (IamInNew) {
1509
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1510
  }
M
Minghao Li 已提交
1511
  if (isDrop) {
1512
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
M
Minghao Li 已提交
1513 1514
  }

M
Minghao Li 已提交
1515
  // add last config index
1516
  syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
M
Minghao Li 已提交
1517

M
Minghao Li 已提交
1518 1519 1520 1521 1522 1523 1524 1525 1526
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1527
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1528
      oldSenders[i] = pSyncNode->senders[i];
S
Shengliang Guan 已提交
1529
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1530
    }
1531

M
Minghao Li 已提交
1532
    // init internal
1533
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
1534
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1535 1536

    // init peersNum, peers, peersId
1537
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1538
    int32_t j = 0;
1539 1540 1541 1542
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
1543 1544 1545
        j++;
      }
    }
S
Shengliang Guan 已提交
1546
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1547
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1548
    }
1549

M
Minghao Li 已提交
1550
    // init replicaNum, replicasId
1551 1552 1553
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
      syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1554
    }
1555

1556
    // update quorum first
1557
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
1558

M
Minghao Li 已提交
1559 1560 1561 1562
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1563

M
Minghao Li 已提交
1564
    // reset snapshot senders
1565

M
Minghao Li 已提交
1566
    // clear new
S
Shengliang Guan 已提交
1567
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1568
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1569
    }
M
Minghao Li 已提交
1570

M
Minghao Li 已提交
1571
    // reset new
S
Shengliang Guan 已提交
1572
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1573 1574
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1575
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1576
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
1577 1578
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
M
Minghao Li 已提交
1579

1580
          pSyncNode->senders[i] = oldSenders[j];
M
Minghao Li 已提交
1581 1582 1583 1584
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
1585 1586
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
          pSyncNode->senders[i]->replicaIndex = i;
M
Minghao Li 已提交
1587

1588 1589
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
M
Minghao Li 已提交
1590 1591

          break;
M
Minghao Li 已提交
1592
        }
1593 1594
      }
    }
1595

M
Minghao Li 已提交
1596
    // create new
S
Shengliang Guan 已提交
1597
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1598 1599 1600 1601 1602 1603 1604 1605
      if (pSyncNode->senders[i] == NULL) {
        pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
        if (pSyncNode->senders[i] == NULL) {
          // will be created later while send snapshot
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
        } else {
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
        }
S
Shengliang Guan 已提交
1606
      } else {
1607
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
M
Minghao Li 已提交
1608
      }
1609 1610
    }

M
Minghao Li 已提交
1611
    // free old
S
Shengliang Guan 已提交
1612
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1613
      if (oldSenders[i] != NULL) {
1614
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1615 1616 1617
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1618 1619
    }

1620
    // persist cfg
1621
    syncWriteCfgFile(pSyncNode);
1622

M
Minghao Li 已提交
1623 1624
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
1625
      syncNodeBecomeLeader(pSyncNode, "");
1626 1627 1628

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1629
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1630

M
Minghao Li 已提交
1631
    } else {
1632
      syncNodeBecomeFollower(pSyncNode, "");
M
Minghao Li 已提交
1633 1634
    }
  } else {
1635
    // persist cfg
1636 1637
    syncWriteCfgFile(pSyncNode);
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.replicaNum, pNewConfig->replicaNum);
1638
  }
1639

M
Minghao Li 已提交
1640
_END:
M
Minghao Li 已提交
1641
  // log end config change
S
Shengliang Guan 已提交
1642
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.replicaNum, pNewConfig->replicaNum);
M
Minghao Li 已提交
1643 1644
}

M
Minghao Li 已提交
1645 1646
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
S
Shengliang Guan 已提交
1647 1648
  if (term > pSyncNode->raftStore.currentTerm) {
    raftStoreSetTerm(pSyncNode, term);
1649
    char tmpBuf[64];
1650
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1651
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1652
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1653 1654 1655
  }
}

1656
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
S
Shengliang Guan 已提交
1657 1658
  if (term > pSyncNode->raftStore.currentTerm) {
    raftStoreSetTerm(pSyncNode, term);
1659 1660 1661
  }
}

M
Minghao Li 已提交
1662
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
S
Shengliang Guan 已提交
1663
  if (pSyncNode->raftStore.currentTerm > newTerm) {
1664
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1665
            pSyncNode->raftStore.currentTerm);
M
Minghao Li 已提交
1666 1667
    return;
  }
M
Minghao Li 已提交
1668 1669

  do {
1670
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1671
            pSyncNode->raftStore.currentTerm);
M
Minghao Li 已提交
1672 1673
  } while (0);

S
Shengliang Guan 已提交
1674 1675
  if (pSyncNode->raftStore.currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode, newTerm);
M
Minghao Li 已提交
1676
    char tmpBuf[64];
1677
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1678
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1679
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1680 1681 1682 1683 1684 1685 1686 1687

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1688 1689
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1690
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1691
  // maybe clear leader cache
M
Minghao Li 已提交
1692 1693 1694 1695
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1696 1697
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1698
  // state change
M
Minghao Li 已提交
1699 1700 1701
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1702 1703
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1704

1705 1706 1707
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1708 1709 1710 1711 1712
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1713 1714 1715
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1716 1717 1718
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1719
  // trace log
S
Shengliang Guan 已提交
1720
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1741
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1742 1743
  pSyncNode->leaderTime = taosGetTimestampMs();

1744
  pSyncNode->becomeLeaderNum++;
1745
  pSyncNode->hbrSlowNum = 0;
1746

1747 1748 1749
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1750
  // state change
M
Minghao Li 已提交
1751
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1752 1753

  // set leader cache
M
Minghao Li 已提交
1754 1755
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1756
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
1757 1758 1759
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1760
    ASSERT(code == 0);
1761
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1762 1763
  }

S
Shengliang Guan 已提交
1764
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1765 1766
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1767 1768 1769
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1770 1771 1772
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1773
#if 0
1774 1775
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1776
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1777
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1778 1779
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
1780
      }
1781
    }
1782
    (pMySender->privateTerm) += 100;
1783
  }
M
Minghao Li 已提交
1784
#endif
1785

1786
  // close receiver
M
Minghao Li 已提交
1787 1788
  if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
      snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1789
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1790 1791
  }

M
Minghao Li 已提交
1792
  // stop elect timer
M
Minghao Li 已提交
1793
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1794

M
Minghao Li 已提交
1795 1796
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1797

M
Minghao Li 已提交
1798 1799
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1800

1801 1802 1803 1804 1805
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1806 1807 1808
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1809 1810 1811
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1812
  // trace log
1813
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1814 1815 1816
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1817
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1818 1819 1820 1821 1822
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
  if (!granted) {
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
    return;
  }
1823
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1824

S
Shengliang Guan 已提交
1825
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1826

B
Benguang Zhao 已提交
1827
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1828 1829 1830 1831
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1832
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1833
  ASSERT(lastIndex >= 0);
S
Shengliang Guan 已提交
1834
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "",
S
Shengliang Guan 已提交
1835
        pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1836 1837
}

M
Minghao Li 已提交
1838 1839
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1840
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1841
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1842 1843 1844 1845 1846
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1847 1848 1849
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1850
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1851
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1852
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1853
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
S
Shengliang Guan 已提交
1854
        pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1855

S
Shengliang Guan 已提交
1856
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1857 1858 1859
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1860
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1861
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1862
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1863
  sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
S
Shengliang Guan 已提交
1864
        pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1865

S
Shengliang Guan 已提交
1866
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1867 1868 1869
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1870
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1871
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1872
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1873
  sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
S
Shengliang Guan 已提交
1874
        pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1875

S
Shengliang Guan 已提交
1876
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1877 1878
}

M
Minghao Li 已提交
1879 1880
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1881
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
1882
  ASSERT(term == pSyncNode->raftStore.currentTerm);
1883 1884
  bool voted = raftStoreHasVoted(pSyncNode);
  ASSERT(!voted);
M
Minghao Li 已提交
1885

S
Shengliang Guan 已提交
1886
  raftStoreVote(pSyncNode, pRaftId);
M
Minghao Li 已提交
1887 1888
}

M
Minghao Li 已提交
1889
// simulate get vote from outside
M
Minghao Li 已提交
1890
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1891
  syncNodeVoteForTerm(pSyncNode, pSyncNode->raftStore.currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1892

S
Shengliang Guan 已提交
1893 1894
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1895
  if (ret != 0) return;
M
Minghao Li 已提交
1896

S
Shengliang Guan 已提交
1897
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1898 1899
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
S
Shengliang Guan 已提交
1900
  pMsg->term = pSyncNode->raftStore.currentTerm;
M
Minghao Li 已提交
1901 1902 1903 1904
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1905
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1906 1907
}

M
Minghao Li 已提交
1908
// return if has a snapshot
M
Minghao Li 已提交
1909 1910
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1911
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1912 1913
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1914 1915 1916 1917 1918 1919 1920
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1921 1922
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1923
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1924
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1925 1926
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1927 1928 1929 1930 1931 1932 1933
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1934 1935
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1936 1937
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1938 1939
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1940
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1941 1942
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1943 1944
    }

M
Minghao Li 已提交
1945 1946 1947
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1948 1949 1950 1951
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1952
  } else {
M
Minghao Li 已提交
1953 1954
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1955
  }
M
Minghao Li 已提交
1956

M
Minghao Li 已提交
1957 1958 1959 1960 1961 1962 1963
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1964 1965
  return 0;
}
M
Minghao Li 已提交
1966

M
Minghao Li 已提交
1967
// return append-entries first try index
M
Minghao Li 已提交
1968 1969 1970 1971 1972
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1973 1974
// if index > 0, return index - 1
// else, return -1
1975 1976 1977 1978 1979 1980 1981 1982 1983
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1984 1985 1986 1987
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1988 1989 1990 1991 1992 1993 1994 1995 1996
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

1997 1998 1999
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

2000
  SSyncRaftEntry* pPreEntry = NULL;
2001 2002 2003 2004 2005 2006 2007
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

2008
    pSyncNode->pLogStore->cacheHit++;
2009 2010 2011
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
2012
    pSyncNode->pLogStore->cacheMiss++;
2013 2014 2015 2016
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
2017 2018 2019 2020 2021 2022

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2023
  if (code == 0) {
2024
    ASSERT(pPreEntry != NULL);
2025
    preTerm = pPreEntry->term;
2026 2027 2028 2029

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
2030
      syncEntryDestroy(pPreEntry);
2031 2032
    }

2033 2034
    return preTerm;
  } else {
2035 2036 2037 2038
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2039 2040 2041 2042
      }
    }
  }

2043
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
2044
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2045 2046
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2047 2048 2049 2050

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2051
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2052 2053 2054
  return 0;
}

M
Minghao Li 已提交
2055
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2056
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2057

S
Shengliang Guan 已提交
2058 2059 2060
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2061
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
2062 2063
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
2064
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
2065 2066
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
2067
    }
M
Minghao Li 已提交
2068

M
Minghao Li 已提交
2069
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
2070 2071
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
2072
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
2073 2074
      rpcFreeCont(rpcMsg.pCont);
      return;
2075
    }
M
Minghao Li 已提交
2076

S
Shengliang Guan 已提交
2077
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
2078
  }
M
Minghao Li 已提交
2079 2080
}

M
Minghao Li 已提交
2081
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2082
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2083

M
Minghao Li 已提交
2084 2085
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2086

2087
  if (pNode == NULL) return;
M
Minghao Li 已提交
2088 2089 2090 2091 2092

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2093

2094
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2095 2096 2097 2098
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2099

S
Shengliang Guan 已提交
2100
  SRpcMsg rpcMsg = {0};
2101 2102
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2103

S
Shengliang Guan 已提交
2104
  if (code != 0) {
M
Minghao Li 已提交
2105
    sError("failed to build elect msg");
M
Minghao Li 已提交
2106
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2107
    return;
M
Minghao Li 已提交
2108 2109
  }

S
Shengliang Guan 已提交
2110
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2111
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2112 2113 2114

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2115
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2116
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2117
    syncNodeRelease(pNode);
2118
    return;
M
Minghao Li 已提交
2119
  }
M
Minghao Li 已提交
2120 2121

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2122 2123
}

M
Minghao Li 已提交
2124
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2125
  if (!syncIsInit()) return;
2126

S
Shengliang Guan 已提交
2127 2128 2129 2130
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2131
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2132 2133 2134
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2135
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2136
        return;
2137
      }
M
Minghao Li 已提交
2138

2139
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2140 2141
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2142
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2143 2144
        rpcFreeCont(rpcMsg.pCont);
        return;
2145
      }
S
Shengliang Guan 已提交
2146 2147 2148 2149

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2150
    } else {
S
Shengliang Guan 已提交
2151 2152
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2153
    }
M
Minghao Li 已提交
2154 2155 2156
  }
}

2157
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2158
  int64_t hbDataRid = (int64_t)param;
2159
  int64_t tsNow = taosGetTimestampMs();
2160

2161 2162
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
M
Minghao Li 已提交
2163
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2164 2165
    return;
  }
2166

2167
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2168
  if (pSyncNode == NULL) {
2169
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2170
    sError("hb timer get pSyncNode NULL");
2171 2172 2173 2174 2175 2176 2177 2178
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2179
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2180 2181 2182
    return;
  }

M
Minghao Li 已提交
2183
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2184 2185
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2186
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2187 2188 2189
    return;
  }

M
Minghao Li 已提交
2190
  // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
2191 2192

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2193 2194 2195
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2196
    if (timerLogicClock == msgLogicClock) {
2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212
      if (tsNow > pData->execTime) {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
            "---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif

        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
S
Shengliang Guan 已提交
2213
        pSyncMsg->term = pSyncNode->raftStore.currentTerm;
2214 2215 2216
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
        pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
        pSyncMsg->privateTerm = 0;
2217
        pSyncMsg->timeStamp = tsNow;
2218 2219 2220 2221 2222 2223

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
2224 2225
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2226 2227 2228 2229 2230 2231 2232 2233
      } else {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
      }

M
Minghao Li 已提交
2234 2235
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2236 2237
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2238 2239 2240 2241
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2242
    } else {
M
Minghao Li 已提交
2243 2244
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2245 2246
    }
  }
2247 2248 2249

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2250 2251
}

2252 2253 2254 2255 2256
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
2257

2258
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
S
Shengliang Guan 已提交
2259
  SyncTerm        term = pNode->raftStore.currentTerm;
2260 2261
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
2262

S
Shengliang Guan 已提交
2263
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2264
  int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
2265
  syncEntryDestroy(pEntry);
M
Minghao Li 已提交
2266

2267 2268 2269
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2270
    sError("failed to propose noop msg while enqueue since %s", terrstr());
2271
  }
M
Minghao Li 已提交
2272

2273
  return code;
M
Minghao Li 已提交
2274 2275
}

2276 2277
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2278 2279 2280 2281
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2282 2283
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2284 2285 2286 2287 2288 2289 2290 2291 2292
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2293
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
2294 2295 2296 2297 2298 2299 2300
  if (pEntry->dataLen < sizeof(SMsgHead)) {
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
    syncEntryDestroy(pEntry);
    return -1;
  }

B
Benguang Zhao 已提交
2301 2302
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2303 2304
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
    terrno = TSDB_CODE_SYN_BUFFER_FULL;
S
Shengliang Guan 已提交
2305
    (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->raftStore.currentTerm, pEntry, TSDB_CODE_SYN_BUFFER_FULL);
2306
    syncEntryDestroy(pEntry);
B
Benguang Zhao 已提交
2307 2308 2309 2310
    return -1;
  }

  // proceed match index, with replicating on needed
2311
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2312

S
Shengliang Guan 已提交
2313
  sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2314 2315 2316
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2317

B
Benguang Zhao 已提交
2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2334
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346
  if (pSyncNode->replicaNum == 1) {
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2347
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2348 2349 2350 2351 2352 2353 2354 2355 2356
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2376
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2377
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
S
Shengliang Guan 已提交
2378
  SyncTerm  term = ths->raftStore.currentTerm;
B
Benguang Zhao 已提交
2379 2380 2381 2382 2383 2384 2385

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2386 2387
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2388 2389 2390
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2391 2392
  int32_t ret = 0;

2393
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
S
Shengliang Guan 已提交
2394
  SyncTerm        term = ths->raftStore.currentTerm;
M
Minghao Li 已提交
2395
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2396
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2397

2398 2399
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2400
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2401
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
2402
    if (code != 0) {
M
Minghao Li 已提交
2403
      sError("append noop error");
2404 2405
      return -1;
    }
2406 2407

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2408 2409
  }

2410 2411 2412
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2413
    syncEntryDestroy(pEntry);
2414 2415
  }

M
Minghao Li 已提交
2416 2417 2418
  return ret;
}

S
Shengliang Guan 已提交
2419 2420
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2421

M
Minghao Li 已提交
2422 2423 2424 2425
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2426
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2427
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2428
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2429

2430 2431 2432 2433
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2434 2435
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
S
Shengliang Guan 已提交
2436
  pMsgReply->term = ths->raftStore.currentTerm;
2437
  pMsgReply->privateTerm = 8864;  // magic number
2438
  pMsgReply->startTime = ths->startTime;
2439
  pMsgReply->timeStamp = tsMs;
2440

S
Shengliang Guan 已提交
2441
  if (pMsg->term == ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2442 2443
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);

2444
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2445
    ths->minMatchIndex = pMsg->minMatchIndex;
2446 2447

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2448
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
S
Shengliang Guan 已提交
2449 2450 2451 2452
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2453
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
2454 2455 2456
      pSyncMsg->commitIndex = pMsg->commitIndex;
      pSyncMsg->currentTerm = pMsg->term;
      SyncIndex fcIndex = pSyncMsg->commitIndex;
2457 2458 2459 2460 2461 2462 2463

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2464
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2465 2466
        }
      }
2467 2468 2469
    }
  }

S
Shengliang Guan 已提交
2470
  if (pMsg->term >= ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2471
    // syncNodeStepDown(ths, pMsg->term);
S
Shengliang Guan 已提交
2472 2473 2474 2475
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2476
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
2477 2478
    pSyncMsg->currentTerm = pMsg->term;
    pSyncMsg->commitIndex = pMsg->commitIndex;
2479

S
Shengliang Guan 已提交
2480 2481
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2482 2483 2484 2485
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
S
Shengliang Guan 已提交
2486
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pSyncMsg->currentTerm);
2487
      }
2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502
    }
  }

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
  return 0;
}

2503
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2504 2505 2506 2507
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2508
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2509
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2510 2511 2512 2513
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2514 2515

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2516
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2517

2518 2519
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2520 2521 2522
  return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg);
}

2523
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2524
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2525

M
Minghao Li 已提交
2526 2527 2528 2529
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2530
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2531
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2532
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2533

2534
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2535
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2536 2537 2538
  return 0;
}

S
Shengliang Guan 已提交
2539 2540
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2541 2542
  syncLogRecvLocalCmd(ths, pMsg, "");

2543
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
2544
    syncNodeStepDown(ths, pMsg->currentTerm);
2545 2546

  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
2547 2548 2549 2550
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
    if (pMsg->currentTerm == matchTerm) {
      (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
    }
2551
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
S
Shengliang Guan 已提交
2552
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
2553 2554 2555 2556 2557 2558 2559 2560 2561
             ths->commitIndex);
    }
  } else {
    sError("error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
2562 2563 2564 2565 2566 2567 2568 2569 2570 2571
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2572

2573
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2574
  sNTrace(ths, "on client request");
2575

B
Benguang Zhao 已提交
2576 2577
  int32_t code = 0;

B
Benguang Zhao 已提交
2578
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
S
Shengliang Guan 已提交
2579
  SyncTerm        term = ths->raftStore.currentTerm;
B
Benguang Zhao 已提交
2580
  SSyncRaftEntry* pEntry = NULL;
2581 2582 2583 2584
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2585 2586
  }

2587 2588 2589 2590 2591
  if (pEntry == NULL) {
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
    return -1;
  }

B
Benguang Zhao 已提交
2592 2593 2594 2595 2596
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2597 2598
    int32_t code = syncNodeAppend(ths, pEntry);
    return code;
2599 2600 2601
  } else {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
2602
    return -1;
B
Benguang Zhao 已提交
2603 2604 2605
  }
}

S
Shengliang Guan 已提交
2606 2607 2608
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2609
      return "follower";
S
Shengliang Guan 已提交
2610
    case TAOS_SYNC_STATE_CANDIDATE:
2611
      return "candidate";
S
Shengliang Guan 已提交
2612
    case TAOS_SYNC_STATE_LEADER:
2613
      return "leader";
S
Shengliang Guan 已提交
2614
    case TAOS_SYNC_STATE_ERROR:
2615
      return "error";
S
Shengliang Guan 已提交
2616 2617 2618 2619
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
    default:
      return "unknown";
S
Shengliang Guan 已提交
2620
  }
M
Minghao Li 已提交
2621
}
2622

2623
#if 0
2624
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2625
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2626
    sNTrace(ths, "I am not follower, can not do leader transfer");
2627 2628
    return 0;
  }
2629 2630

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2631
    sNTrace(ths, "restore not finish, can not do leader transfer");
2632 2633 2634
    return 0;
  }

S
Shengliang Guan 已提交
2635
  if (pEntry->term < ths->raftStore.currentTerm) {
2636
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2637 2638 2639 2640
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2641
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2642 2643 2644
    return 0;
  }

2645 2646
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2647
      sNTrace(ths, "I am vnode, can not do leader transfer");
2648 2649 2650 2651
      return 0;
    }
  */

2652
  SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
S
Shengliang Guan 已提交
2653
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2654

M
Minghao Li 已提交
2655 2656 2657
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2658

M
Minghao Li 已提交
2659 2660
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2661 2662 2663
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
2664
    ASSERT(ret == 0);
M
Minghao Li 已提交
2665

2666
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2667
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2668 2669
  }

M
Minghao Li 已提交
2670
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2671
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2672
        .code = 0,
S
Shengliang Guan 已提交
2673
        .currentTerm = ths->raftStore.currentTerm,
S
Shengliang Guan 已提交
2674 2675 2676 2677 2678 2679 2680
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2681 2682
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2683 2684
  }

2685 2686 2687
  return 0;
}

2688 2689
#endif

2690
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2691
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2692 2693 2694 2695
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
        .vgId = ths->vgId,
    };
2696 2697 2698 2699 2700 2701 2702 2703 2704 2705

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2706 2707 2708 2709
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

2710
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2711
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2712 2713 2714 2715 2716
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2717 2718 2719 2720
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2721
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2722 2723 2724 2725 2726
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2727
}
M
Minghao Li 已提交
2728

2729 2730
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2731
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2732 2733 2734 2735 2736 2737 2738
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2739 2740
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2741
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2742 2743 2744 2745 2746 2747 2748 2749 2750
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2751
  if (pState == NULL) {
2752
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2753 2754
    return false;
  }
M
Minghao Li 已提交
2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2780
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2781
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2782
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2783 2784 2785 2786 2787 2788
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2789
}