syncMain.c 99.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
M
Minghao Li 已提交
40

M
Minghao Li 已提交
41 42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
48 49 50
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58 59 60 61
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
64
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
65
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
66
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
67 68
    return -1;
  }
M
Minghao Li 已提交
69

S
Shengliang Guan 已提交
70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
71
  if (pSyncNode->rid < 0) {
72
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
73 74 75
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79 80 81
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
82
  return pSyncNode->rid;
M
Minghao Li 已提交
83
}
M
Minghao Li 已提交
84

B
Benguang Zhao 已提交
85
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
86
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
87
  if (pSyncNode == NULL) {
B
Benguang Zhao 已提交
88 89 90 91 92
    sError("failed to acquire rid: %" PRId64 " of tsNodeReftId for pSyncNode", rid);
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
93
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
94
    goto _err;
M
Minghao Li 已提交
95
  }
M
Minghao Li 已提交
96

B
Benguang Zhao 已提交
97 98 99 100
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
101

B
Benguang Zhao 已提交
102 103
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
104

105 106 107
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
108 109
}

M
Minghao Li 已提交
110
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
111
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
112
  if (pSyncNode != NULL) {
113
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
114
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
115
    syncNodeRemove(rid);
M
Minghao Li 已提交
116 117 118
  }
}

M
Minghao Li 已提交
119 120
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
121 122 123
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
124 125 126
  }
}

S
Shengliang Guan 已提交
127 128 129
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
130 131
}

S
Shengliang Guan 已提交
132
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
133
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
134
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
135

M
Minghao Li 已提交
136
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
137
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
138
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
139
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
140
    return -1;
M
Minghao Li 已提交
141
  }
142

S
Shengliang Guan 已提交
143 144
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
145

M
Minghao Li 已提交
146 147 148 149
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
150
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
151 152 153
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
154
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
155
  }
S
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
158
  return 0;
M
Minghao Li 已提交
159
}
M
Minghao Li 已提交
160

S
Shengliang Guan 已提交
161 162 163 164
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
165
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
166 167
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
197
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
198 199 200 201 202
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
    default:
203
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
204
      code = -1;
M
Minghao Li 已提交
205 206
  }

S
Shengliang Guan 已提交
207
  syncNodeRelease(pSyncNode);
208 209 210 211
  if (code != 0) {
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
           terrno);
  }
S
Shengliang Guan 已提交
212
  return code;
213 214
}

S
Shengliang Guan 已提交
215
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
216
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
217
  if (pSyncNode == NULL) return -1;
218

S
Shengliang Guan 已提交
219
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
220
  syncNodeRelease(pSyncNode);
221 222 223
  return ret;
}

224
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
225
  SSyncNode* pNode = syncNodeAcquire(rid);
226
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
227 228

  SRpcMsg rpcMsg = {0};
229
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
230 231 232
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
233
  if (ret == 1) {
234
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
235
    rpcSendResponse(&rpcMsg);
236 237
    return 0;
  } else {
238
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
239
    return -1;
240
  }
S
Shengliang Guan 已提交
241 242
}

M
Minghao Li 已提交
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

259
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
260
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
261
  if (pSyncNode == NULL) {
262
    sError("sync begin snapshot error");
263 264
    return -1;
  }
265

266 267
  int32_t code = 0;

M
Minghao Li 已提交
268
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
269 270 271
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
272 273 274
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
275 276 277
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
278 279
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
280
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
281 282 283
      return 0;
    }

M
Minghao Li 已提交
284 285 286
    goto _DEL_WAL;

  } else {
287 288 289 290 291 292 293 294 295 296 297 298
    lastApplyIndex -= SYNC_VNODE_LOG_RETENTION;

    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
      syncNodeRelease(pSyncNode);
      return 0;
    }

M
Minghao Li 已提交
299 300 301 302 303 304 305 306 307 308 309 310 311 312
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
313 314 315 316
              sNTrace(pSyncNode,
                      "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                      " of %s:%d, do not delete wal",
                      lastApplyIndex, matchIndex, host, port);
M
Minghao Li 已提交
317 318
            } while (0);

S
Shengliang Guan 已提交
319
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
320 321 322 323 324 325
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
326 327 328
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
329
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
330 331 332 333
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
334
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
335
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
336 337 338
        return 0;

      } else {
S
Shengliang Guan 已提交
339
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
340
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
341 342 343 344 345 346 347 348 349
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
350 351 352
    }
  }

M
Minghao Li 已提交
353
_DEL_WAL:
354

M
Minghao Li 已提交
355
  do {
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

        code = walBeginSnapshot(pData->pWal, lastApplyIndex);
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
376

M
Minghao Li 已提交
377
      } else {
378 379
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
380
      }
381
    }
M
Minghao Li 已提交
382
  } while (0);
383

S
Shengliang Guan 已提交
384
  syncNodeRelease(pSyncNode);
385 386 387 388
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
389
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
390
  if (pSyncNode == NULL) {
391
    sError("sync end snapshot error");
392 393 394
    return -1;
  }

395 396 397 398
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
399
    if (code != 0) {
400
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
401
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
402 403
      return -1;
    } else {
S
Shengliang Guan 已提交
404
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
405 406
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
407
  }
408

S
Shengliang Guan 已提交
409
  syncNodeRelease(pSyncNode);
410 411 412
  return code;
}

M
Minghao Li 已提交
413
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
414
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
415
  if (pSyncNode == NULL) {
416
    sError("sync step down error");
M
Minghao Li 已提交
417 418 419
    return -1;
  }

M
Minghao Li 已提交
420
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
421
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
422
  return 0;
M
Minghao Li 已提交
423 424
}

425
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
426
  if (pSyncNode == NULL) {
427
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
428
    sError("sync ready for read error");
429 430
    return false;
  }
M
Minghao Li 已提交
431

432 433 434 435 436 437
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

  if (pSyncNode->restoreFinish) {
438
    return true;
M
Minghao Li 已提交
439 440
  }

441
  bool ready = false;
442 443 444
  if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
    // apply queue not empty
    ready = false;
M
Minghao Li 已提交
445

446 447 448 449 450 451 452 453 454 455 456 457 458
  } else {
    if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
      SyncIndex       lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
      SSyncRaftEntry* pEntry = NULL;
      SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
      LRUHandle*      h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
      int32_t         code = 0;
      if (h) {
        pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        code = 0;

        pSyncNode->pLogStore->cacheHit++;
        sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
M
Minghao Li 已提交
459

460 461 462
      } else {
        pSyncNode->pLogStore->cacheMiss++;
        sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
M
Minghao Li 已提交
463

464 465
        code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
      }
466

467 468 469
      if (code == 0 && pEntry != NULL) {
        if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
          ready = true;
470
        }
471

472 473 474 475
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestroy(pEntry);
476
        }
477 478 479 480
      }
    }
  }

481
  if (!ready) {
482
    terrno = TSDB_CODE_SYN_RESTORING;
483
  }
484

485 486 487 488 489 490 491 492 493 494 495 496
  return ready;
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

497 498
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
499
}
M
Minghao Li 已提交
500

501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
523 524
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
525
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
526 527
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
528
  }
M
Minghao Li 已提交
529

530
  int32_t ret = 0;
531
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
532
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
533 534 535 536 537 538 539
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
540 541 542
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
543
  return ret;
M
Minghao Li 已提交
544 545
}

M
Minghao Li 已提交
546 547
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
548
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
549 550
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
551
  }
552

S
Shengliang Guan 已提交
553
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
554

555 556 557 558
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
559 560 561 562
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
563
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
564 565
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
566 567
}

568 569
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
570

S
Shengliang Guan 已提交
571
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
572 573 574
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
575 576 577 578 579
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
580
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
581 582
  }

583
  return state;
M
Minghao Li 已提交
584 585
}

586
#if 0
587 588 589 590 591
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
592
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
593 594 595
  if (pSyncNode == NULL) {
    return -1;
  }
596
  ASSERT(rid == pSyncNode->rid);
597 598 599 600 601

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
B
Benguang Zhao 已提交
602
      syncEntryDestroy(pEntry);
603
    }
S
Shengliang Guan 已提交
604
    syncNodeRelease(pSyncNode);
605 606
    return -1;
  }
607
  ASSERT(pEntry != NULL);
608 609 610 611 612 613

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

614
  syncEntryDestroy(pEntry);
S
Shengliang Guan 已提交
615
  syncNodeRelease(pSyncNode);
616 617 618
  return 0;
}

619
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
620
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
621 622 623
  if (pSyncNode == NULL) {
    return -1;
  }
624
  ASSERT(rid == pSyncNode->rid);
625 626
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
627
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
628

S
Shengliang Guan 已提交
629
  syncNodeRelease(pSyncNode);
630 631 632
  return 0;
}

633
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
634
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
635 636 637
  if (pSyncNode == NULL) {
    return -1;
  }
638
  ASSERT(rid == pSyncNode->rid);
639

640
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
641 642
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
643
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
644 645 646 647 648 649
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
650
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
651
         sMeta->lastConfigIndex);
652

S
Shengliang Guan 已提交
653
  syncNodeRelease(pSyncNode);
654 655
  return 0;
}
656
#endif
657

658
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
659
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
660 661
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
662
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
663 664 665 666 667
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
668
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
669
         snapshotLastApplyIndex, lastIndex);
670 671 672 673

  return lastIndex;
}

674 675
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
676

S
Shengliang Guan 已提交
677
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
678
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
679

S
Shengliang Guan 已提交
680
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
681 682 683 684
    SEp* pEp = &pEpSet->eps[i];
    tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    pEpSet->numOfEps++;
685
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
686
  }
M
Minghao Li 已提交
687 688
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
689 690
  }

S
Shengliang Guan 已提交
691
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
692
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
693 694
}

S
Shengliang Guan 已提交
695
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
696
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
697
  if (pSyncNode == NULL) {
698
    sError("sync propose error");
M
Minghao Li 已提交
699
    return -1;
700
  }
701

S
Shengliang Guan 已提交
702
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
703
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
704 705
  return ret;
}
M
Minghao Li 已提交
706

S
Shengliang Guan 已提交
707
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
708 709
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
710
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
711 712
    return -1;
  }
713

S
Shengliang Guan 已提交
714 715 716 717 718 719 720
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
721

722
  // heartbeat timeout
723
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
724 725 726 727 728 729
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
730 731 732
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
733
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
734 735 736
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
737 738 739
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
740
    } else {
S
Shengliang Guan 已提交
741
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
742
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
743
             TMSG_INFO(pMsg->msgType));
744
      return -1;
745
    }
S
Shengliang Guan 已提交
746
  } else {
S
Shengliang Guan 已提交
747 748
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
749
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
750
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
751 752 753 754
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
755
    }
756

757 758 759 760 761
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
762
    }
M
Minghao Li 已提交
763

S
Shengliang Guan 已提交
764
    if (seq != NULL) *seq = seqNum;
765
    return code;
M
Minghao Li 已提交
766
  }
M
Minghao Li 已提交
767 768
}

S
Shengliang Guan 已提交
769
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
770 771 772 773 774
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
775
  pSyncTimer->timeStamp = taosGetTimestampMs();
776 777 778 779
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
780
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
781
  int32_t ret = 0;
S
Shengliang Guan 已提交
782
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
783
  if (syncIsInit()) {
784 785 786 787 788 789
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
790
    pSyncTimer->timeStamp = tsNow;
791 792

    pData->syncNodeRid = pSyncNode->rid;
793 794 795
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
796
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
797

798 799
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
800 801 802 803 804 805
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
806
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
807 808 809 810
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
811 812
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
813 814 815
  return ret;
}

816
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
817 818 819
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
820 821 822
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

823 824 825 826 827 828 829 830 831 832 833 834 835
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer: %" PRId64 ", snapshotVer: %" PRId64,
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
836
// open/close --------------
S
Shengliang Guan 已提交
837 838
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
839 840 841 842
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
843

M
Minghao Li 已提交
844 845 846 847
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
848
      goto _error;
M
Minghao Li 已提交
849
    }
850
  }
M
Minghao Li 已提交
851

S
Shengliang Guan 已提交
852
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
853
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
854
    // create a new raft config file
S
Shengliang Guan 已提交
855
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
856
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
857
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
858
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
859
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
860 861
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
862
      goto _error;
863
    }
864
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
865
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
866 867
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
868 869 870
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
871
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
872
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
873
      goto _error;
874
    }
S
Shengliang Guan 已提交
875 876

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
877 878 879 880 881 882
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
883 884 885 886
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
887 888

    raftCfgClose(pSyncNode->pRaftCfg);
889
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
890 891
  }

M
Minghao Li 已提交
892
  // init by SSyncInfo
M
Minghao Li 已提交
893
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
894
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
S
Shengliang Guan 已提交
895
  sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
S
Shengliang Guan 已提交
896 897
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
S
Shengliang Guan 已提交
898
    sInfo("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
S
Shengliang Guan 已提交
899 900
  }

M
Minghao Li 已提交
901
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
902 903 904
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
905

M
Minghao Li 已提交
906
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
907
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
908 909 910
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
911

B
Benguang Zhao 已提交
912 913 914
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
915
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
916 917 918
    goto _error;
  }

M
Minghao Li 已提交
919 920
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
921
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
922
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
923 924
    goto _error;
  }
M
Minghao Li 已提交
925

M
Minghao Li 已提交
926
  // init internal
M
Minghao Li 已提交
927
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
928
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
929
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
930
    goto _error;
931
  }
M
Minghao Li 已提交
932

M
Minghao Li 已提交
933
  // init peersNum, peers, peersId
M
Minghao Li 已提交
934
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
935 936
  int32_t j = 0;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
937 938
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
939 940 941
      j++;
    }
  }
S
Shengliang Guan 已提交
942
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
943
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
944
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
945
      goto _error;
946
    }
M
Minghao Li 已提交
947
  }
M
Minghao Li 已提交
948

M
Minghao Li 已提交
949
  // init replicaNum, replicasId
M
Minghao Li 已提交
950
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
951
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
952
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
953
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
954
      goto _error;
955
    }
M
Minghao Li 已提交
956 957
  }

M
Minghao Li 已提交
958
  // init raft algorithm
M
Minghao Li 已提交
959
  pSyncNode->pFsm = pSyncInfo->pFsm;
960
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
961
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
962 963
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
964
  // init life cycle outside
M
Minghao Li 已提交
965

M
Minghao Li 已提交
966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
990
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
991
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
992
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
993
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
994
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
995 996
    goto _error;
  }
M
Minghao Li 已提交
997

M
Minghao Li 已提交
998
  // init TLA+ candidate vars
M
Minghao Li 已提交
999
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
1000
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
1001
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
1002 1003
    goto _error;
  }
M
Minghao Li 已提交
1004
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
1005
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
1006
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
1007 1008
    goto _error;
  }
M
Minghao Li 已提交
1009

M
Minghao Li 已提交
1010 1011
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
1012
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
1013
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1014 1015
    goto _error;
  }
M
Minghao Li 已提交
1016
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
1017
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
1018
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1019 1020
    goto _error;
  }
M
Minghao Li 已提交
1021 1022 1023

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
1024
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
1025
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
1026 1027
    goto _error;
  }
1028 1029 1030 1031

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
1032
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1033 1034
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
1035
      sNTrace(pSyncNode, "reset commit index by snapshot");
1036 1037 1038
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
1039

1040 1041 1042
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
    goto _error;
  }
M
Minghao Li 已提交
1043 1044
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
1045 1046
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
1047

M
Minghao Li 已提交
1048
  // init ping timer
M
Minghao Li 已提交
1049
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
1050
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
1051 1052
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
1053
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
1054
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
1055

M
Minghao Li 已提交
1056 1057
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
1058
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1059
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
1060
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
1061 1062 1063 1064
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
1065
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
1066 1067
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
1068
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
1069 1070
  pSyncNode->heartbeatTimerCounter = 0;

1071 1072 1073 1074 1075
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
1076
  // tools
M
Minghao Li 已提交
1077
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
1078
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
1079
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
1080 1081
    goto _error;
  }
M
Minghao Li 已提交
1082

1083 1084
  // restore state
  pSyncNode->restoreFinish = false;
1085

M
Minghao Li 已提交
1086
  // snapshot senders
S
Shengliang Guan 已提交
1087
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1088
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
1089 1090 1091 1092
    if (pSender == NULL) return NULL;

    pSyncNode->senders[i] = pSender;
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
M
Minghao Li 已提交
1093 1094 1095
  }

  // snapshot receivers
1096
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
1097 1098 1099
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
          pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1100

M
Minghao Li 已提交
1101 1102 1103
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
1104 1105 1106
  // replication mgr
  syncNodeLogReplMgrInit(pSyncNode);

M
Minghao Li 已提交
1107 1108 1109
  // peer state
  syncNodePeerStateInit(pSyncNode);

B
Benguang Zhao 已提交
1110
  //
M
Minghao Li 已提交
1111 1112 1113
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1114
  // start in syncNodeStart
M
Minghao Li 已提交
1115
  // start raft
M
Minghao Li 已提交
1116
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1117

M
Minghao Li 已提交
1118 1119
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1120
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1121 1122
  pSyncNode->lastReplicateTime = timeNow;

1123 1124 1125
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
1126 1127
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
1128
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1129
    goto _error;
B
Benguang Zhao 已提交
1130 1131
  }

1132
  pSyncNode->isStart = true;
1133 1134 1135
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1136 1137
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1138
  pSyncNode->tmrRoutineNum = 0;
1139

1140 1141
  sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
M
Minghao Li 已提交
1142
  return pSyncNode;
1143 1144 1145

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1146 1147
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1148 1149 1150 1151
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1152 1153
}

M
Minghao Li 已提交
1154 1155
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1156 1157
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1158 1159 1160 1161 1162 1163
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1164
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1165 1166
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1167 1168 1169 1170

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1171 1172 1173 1174 1175 1176
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex: %" PRId64 ", lastVer: %" PRId64 "",
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1177

1178
  ASSERT(endIndex == lastVer + 1);
B
Benguang Zhao 已提交
1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
  commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);

  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) {
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
  if (pSyncNode->replicaNum == 1) {
    raftStoreNextTerm(pSyncNode->pRaftStore);
    syncNodeBecomeLeader(pSyncNode, "one replica start");

    // Raft 3.6.2 Committing entries from previous terms
    syncNodeAppendNoop(pSyncNode);
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1202
  ASSERT(ret == 0);
B
Benguang Zhao 已提交
1203 1204 1205 1206
  return ret;
}

void syncNodeStartOld(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1207
  // start raft
1208
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
1209
    raftStoreNextTerm(pSyncNode->pRaftStore);
1210
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
1211

1212
    // Raft 3.6.2 Committing entries from previous terms
1213 1214
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
1215

M
Minghao Li 已提交
1216 1217
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
1218 1219
  }

1220 1221
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1222
  ASSERT(ret == 0);
M
Minghao Li 已提交
1223 1224
}

B
Benguang Zhao 已提交
1225
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1226 1227 1228 1229 1230 1231 1232
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1233
  ASSERT(ret == 0);
1234

1235 1236
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1237
  ASSERT(ret == 0);
B
Benguang Zhao 已提交
1238
  return ret;
M
Minghao Li 已提交
1239 1240
}

M
Minghao Li 已提交
1241
void syncNodePreClose(SSyncNode* pSyncNode) {
1242 1243 1244 1245
  if (pSyncNode != NULL && pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpApplyQueueItems != NULL) {
    while (1) {
      int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
      sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
1246
      if (aqItems == 0 || aqItems == -1) {
1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
        break;
      }
      taosMsleep(20);
    }
  }

  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
      snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
    }

1258 1259
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
1260 1261 1262 1263
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

M
Minghao Li 已提交
1264 1265 1266 1267 1268 1269 1270
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

1271
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1272

M
Minghao Li 已提交
1273
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1274
  if (pSyncNode == NULL) return;
1275
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1276

S
Shengliang Guan 已提交
1277
  int32_t ret = raftStoreClose(pSyncNode->pRaftStore);
1278
  ASSERT(ret == 0);
M
Minghao Li 已提交
1279
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
1280

B
Benguang Zhao 已提交
1281
  syncNodeLogReplMgrDestroy(pSyncNode);
M
Minghao Li 已提交
1282
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1283
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1284
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1285
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1286
  votesRespondDestory(pSyncNode->pVotesRespond);
1287
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1288
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1289
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1290
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1291
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1292
  logStoreDestory(pSyncNode->pLogStore);
1293
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1294 1295
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1296
  raftCfgClose(pSyncNode->pRaftCfg);
1297
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1298 1299 1300 1301 1302

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

S
Shengliang Guan 已提交
1303
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1304 1305
    if (pSyncNode->senders[i] != NULL) {
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
1306

1307 1308
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
        snapshotSenderStop(pSyncNode->senders[i], false);
1309 1310
      }

1311 1312
      snapshotSenderDestroy(pSyncNode->senders[i]);
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1313 1314 1315
    }
  }

M
Minghao Li 已提交
1316
  if (pSyncNode->pNewNodeReceiver != NULL) {
1317 1318 1319 1320
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
      snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
    }

1321
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1322 1323 1324 1325
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1326 1327 1328 1329
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

1330
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1331 1332
}

M
Minghao Li 已提交
1333
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1334

M
Minghao Li 已提交
1335 1336 1337
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1338 1339
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1340 1341 1342
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1343
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1344
  }
M
Minghao Li 已提交
1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1358
  if (syncIsInit()) {
1359
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1360

1361 1362 1363 1364 1365
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1366

M
Minghao Li 已提交
1367
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1368
                 &pSyncNode->pElectTimer);
1369

1370
  } else {
M
Minghao Li 已提交
1371
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1372
  }
M
Minghao Li 已提交
1373 1374 1375 1376 1377
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1378
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1379 1380
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1381

M
Minghao Li 已提交
1382 1383 1384 1385 1386 1387 1388 1389 1390 1391
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

1392
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1393 1394 1395 1396 1397 1398 1399
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
1400 1401

  (void)syncNodeRestartElectTimer(pSyncNode, electMS);
1402

S
Shengliang Guan 已提交
1403 1404
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1405 1406
}

M
Minghao Li 已提交
1407
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1408
  int32_t ret = 0;
S
Shengliang Guan 已提交
1409 1410
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1411 1412 1413
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1414
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1415
  }
1416

S
Shengliang Guan 已提交
1417
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1418 1419 1420
  return ret;
}

M
Minghao Li 已提交
1421
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1422
  int32_t ret = 0;
M
Minghao Li 已提交
1423

1424
#if 0
M
Minghao Li 已提交
1425
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1426 1427
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1428

S
Shengliang Guan 已提交
1429
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1430
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1431 1432 1433
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1434
  }
1435

M
Minghao Li 已提交
1436 1437 1438
  return ret;
}

M
Minghao Li 已提交
1439 1440
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1441 1442

#if 0
M
Minghao Li 已提交
1443 1444 1445
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1446
#endif
1447

S
Shengliang Guan 已提交
1448
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1449
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1450 1451 1452
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1453
  }
1454

M
Minghao Li 已提交
1455 1456 1457
  return ret;
}

1458 1459 1460 1461 1462 1463
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1464 1465
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1466
  syncUtilRaftId2EpSet(destRaftId, &epSet);
1467

S
Shengliang Guan 已提交
1468
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1469
    syncUtilMsgHtoN(pMsg->pCont);
1470
    pMsg->info.noResp = 1;
1471
    return pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1472
  } else {
M
Minghao Li 已提交
1473
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
S
Shengliang Guan 已提交
1474
    rpcFreeCont(pMsg->pCont);
1475
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
1476
    return -1;
M
Minghao Li 已提交
1477
  }
M
Minghao Li 已提交
1478 1479 1480 1481
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1482
  syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1483
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1484 1485 1486
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1487
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1488
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1489
  } else {
M
Minghao Li 已提交
1490
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1491
  }
M
Minghao Li 已提交
1492 1493 1494
  return 0;
}

1495
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1496 1497 1498
  bool b1 = false;
  bool b2 = false;

S
Shengliang Guan 已提交
1499
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1500 1501 1502 1503 1504 1505 1506
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

S
Shengliang Guan 已提交
1507
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1508 1509 1510 1511 1512 1513 1514 1515 1516 1517
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

1518
  ASSERT(b1 == b2);
1519 1520 1521
  return b1;
}

1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1535
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1536
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1537 1538 1539 1540
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1541

1542
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1543 1544
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

1545 1546
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1547 1548
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1549

M
Minghao Li 已提交
1550 1551
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1552

M
Minghao Li 已提交
1553 1554 1555 1556
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1557
  }
1558

M
Minghao Li 已提交
1559 1560 1561 1562 1563
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1564

M
Minghao Li 已提交
1565
  // log begin config change
S
Shengliang Guan 已提交
1566 1567 1568 1569
  char oldCfgStr[1024] = {0};
  char newCfgStr[1024] = {0};
  syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
  syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
1570
  sNInfo(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
M
Minghao Li 已提交
1571

M
Minghao Li 已提交
1572 1573
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1574
  }
M
Minghao Li 已提交
1575 1576
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1577 1578
  }

M
Minghao Li 已提交
1579
  // add last config index
M
Minghao Li 已提交
1580
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1581

M
Minghao Li 已提交
1582 1583 1584 1585 1586 1587 1588 1589 1590
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1591
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1592
      oldSenders[i] = pSyncNode->senders[i];
S
Shengliang Guan 已提交
1593
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1594
    }
1595

M
Minghao Li 已提交
1596 1597
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1598
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1599 1600 1601

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1602 1603
    int32_t j = 0;
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
1604 1605 1606 1607 1608
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
S
Shengliang Guan 已提交
1609
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1610
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1611
    }
1612

M
Minghao Li 已提交
1613 1614
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
1615
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
1616
      syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1617
    }
1618

1619 1620 1621
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1622 1623 1624 1625
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1626

M
Minghao Li 已提交
1627
    // reset snapshot senders
1628

M
Minghao Li 已提交
1629
    // clear new
S
Shengliang Guan 已提交
1630
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1631
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1632
    }
M
Minghao Li 已提交
1633

M
Minghao Li 已提交
1634
    // reset new
S
Shengliang Guan 已提交
1635
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1636 1637
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1638
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1639
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
M
Minghao Li 已提交
1640 1641 1642
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1643
          sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
S
Shengliang Guan 已提交
1644
                  (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
M
Minghao Li 已提交
1645

1646
          pSyncNode->senders[i] = oldSenders[j];
M
Minghao Li 已提交
1647 1648 1649 1650
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
1651 1652
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
          pSyncNode->senders[i]->replicaIndex = i;
M
Minghao Li 已提交
1653

S
Shengliang Guan 已提交
1654
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
1655
                  i, host, port, pSyncNode->senders[i], reset);
M
Minghao Li 已提交
1656 1657

          break;
M
Minghao Li 已提交
1658
        }
1659 1660
      }
    }
1661

M
Minghao Li 已提交
1662
    // create new
S
Shengliang Guan 已提交
1663
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1664 1665 1666 1667 1668 1669 1670 1671
      if (pSyncNode->senders[i] == NULL) {
        pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
        if (pSyncNode->senders[i] == NULL) {
          // will be created later while send snapshot
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
        } else {
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
        }
S
Shengliang Guan 已提交
1672
      } else {
1673
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
M
Minghao Li 已提交
1674
      }
1675 1676
    }

M
Minghao Li 已提交
1677
    // free old
S
Shengliang Guan 已提交
1678
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1679
      if (oldSenders[i] != NULL) {
1680
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1681 1682 1683
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1684 1685
    }

1686
    // persist cfg
M
Minghao Li 已提交
1687
    raftCfgPersist(pSyncNode->pRaftCfg);
1688

S
Shengliang Guan 已提交
1689
    char tmpbuf[1024] = {0};
1690
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
S
Shengliang Guan 已提交
1691
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1692

M
Minghao Li 已提交
1693 1694 1695
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
1696 1697 1698

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1699
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1700

M
Minghao Li 已提交
1701 1702 1703 1704
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
1705
    // persist cfg
M
Minghao Li 已提交
1706
    raftCfgPersist(pSyncNode->pRaftCfg);
1707 1708
    sNInfo(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s", oldConfig.replicaNum,
           pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
1709
  }
1710

M
Minghao Li 已提交
1711
_END:
M
Minghao Li 已提交
1712
  // log end config change
1713
  sNInfo(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1714 1715
}

M
Minghao Li 已提交
1716 1717 1718 1719
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1720
    char tmpBuf[64];
1721
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1722
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1723 1724 1725 1726
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1727 1728 1729 1730 1731 1732
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1733
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1734
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1735
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1736
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1737 1738
    return;
  }
M
Minghao Li 已提交
1739 1740

  do {
1741
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1742
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1743 1744 1745 1746 1747
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1748
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1749 1750 1751 1752 1753 1754 1755 1756 1757 1758
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1759 1760
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1761
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1762
  // maybe clear leader cache
M
Minghao Li 已提交
1763 1764 1765 1766
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1767 1768
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1769
  // state change
M
Minghao Li 已提交
1770 1771 1772
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1773 1774
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1775

1776 1777 1778
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1779 1780 1781 1782 1783
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1784 1785 1786
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1787 1788 1789
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1790
  // trace log
S
Shengliang Guan 已提交
1791
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1812
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1813 1814
  pSyncNode->leaderTime = taosGetTimestampMs();

1815
  pSyncNode->becomeLeaderNum++;
1816
  pSyncNode->hbrSlowNum = 0;
1817

1818 1819 1820
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1821
  // state change
M
Minghao Li 已提交
1822
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1823 1824

  // set leader cache
M
Minghao Li 已提交
1825 1826
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1827
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1828 1829
    // maybe overwrite myself, no harm
    // just do it!
1830 1831 1832 1833 1834 1835 1836

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1837
    ASSERT(code == 0);
1838
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1839 1840
  }

S
Shengliang Guan 已提交
1841
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1842 1843
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1844 1845 1846
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1847 1848 1849
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1850
#if 0
1851 1852
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1853
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1854
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1855 1856
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
1857
      }
1858
    }
1859
    (pMySender->privateTerm) += 100;
1860
  }
M
Minghao Li 已提交
1861
#endif
1862

1863
  // close receiver
M
Minghao Li 已提交
1864 1865
  if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
      snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
1866 1867 1868
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1869
  // stop elect timer
M
Minghao Li 已提交
1870
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1871

M
Minghao Li 已提交
1872 1873
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1874

M
Minghao Li 已提交
1875 1876
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1877

1878 1879 1880 1881 1882
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1883 1884 1885
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1886 1887 1888
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1889
  // trace log
1890
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1891 1892 1893
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1894 1895
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1896
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1897

S
Shengliang Guan 已提交
1898
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1899

B
Benguang Zhao 已提交
1900
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1901 1902 1903 1904
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1905
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1906
  ASSERT(lastIndex >= 0);
B
Benguang Zhao 已提交
1907 1908
  sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "",
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1909 1910 1911
}

void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) {
1912 1913
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
B
Benguang Zhao 已提交
1914 1915
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");

M
Minghao Li 已提交
1916
  // Raft 3.6.2 Committing entries from previous terms
1917 1918
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1919 1920

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1921
    syncNodeReplicate(pSyncNode);
1922
  }
M
Minghao Li 已提交
1923 1924
}

M
Minghao Li 已提交
1925 1926
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1927
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1928
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1929 1930 1931 1932 1933
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1934 1935 1936
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1937
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1938
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1939 1940 1941
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1942

S
Shengliang Guan 已提交
1943
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1944 1945 1946
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1947
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1948
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1949 1950 1951 1952
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);

S
Shengliang Guan 已提交
1953
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1954 1955 1956
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1957
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1958
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1959 1960 1961 1962
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
        pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);

S
Shengliang Guan 已提交
1963
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1964 1965
}

M
Minghao Li 已提交
1966 1967
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1968
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
1969 1970
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1971 1972 1973 1974

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1975
// simulate get vote from outside
M
Minghao Li 已提交
1976
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1977
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1978

S
Shengliang Guan 已提交
1979 1980
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1981
  if (ret != 0) return;
M
Minghao Li 已提交
1982

S
Shengliang Guan 已提交
1983
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1984 1985 1986 1987 1988 1989 1990
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1991
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1992 1993
}

M
Minghao Li 已提交
1994
// return if has a snapshot
M
Minghao Li 已提交
1995 1996
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1997
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1998 1999
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2000 2001 2002 2003 2004 2005 2006
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
2007 2008
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
2009
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2010
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2011 2012
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2013 2014 2015 2016 2017 2018 2019
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
2020 2021
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
2022 2023
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
2024 2025
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
2026
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2027 2028
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2029 2030
    }

M
Minghao Li 已提交
2031 2032 2033
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
2034 2035 2036 2037
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
2038
  } else {
M
Minghao Li 已提交
2039 2040
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
2041
  }
M
Minghao Li 已提交
2042

M
Minghao Li 已提交
2043 2044 2045 2046 2047 2048 2049
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
2050 2051
  return 0;
}
M
Minghao Li 已提交
2052

M
Minghao Li 已提交
2053
// return append-entries first try index
M
Minghao Li 已提交
2054 2055 2056 2057 2058
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
2059 2060
// if index > 0, return index - 1
// else, return -1
2061 2062 2063 2064 2065 2066 2067 2068 2069
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
2070 2071 2072 2073
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
2074 2075 2076 2077 2078 2079 2080 2081 2082
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

2083 2084 2085
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

2086
  SSyncRaftEntry* pPreEntry = NULL;
2087 2088 2089 2090 2091 2092 2093
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

2094
    pSyncNode->pLogStore->cacheHit++;
2095 2096 2097
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
2098
    pSyncNode->pLogStore->cacheMiss++;
2099 2100 2101 2102
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
2103 2104 2105 2106 2107 2108

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2109
  if (code == 0) {
2110
    ASSERT(pPreEntry != NULL);
2111
    preTerm = pPreEntry->term;
2112 2113 2114 2115

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
2116
      syncEntryDestroy(pPreEntry);
2117 2118
    }

2119 2120
    return preTerm;
  } else {
2121 2122 2123 2124
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2125 2126 2127 2128
      }
    }
  }

2129
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
2130
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2131 2132
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2133 2134 2135 2136

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2137
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2138 2139 2140
  return 0;
}

M
Minghao Li 已提交
2141
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2142
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2143

S
Shengliang Guan 已提交
2144 2145 2146
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2147
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
2148 2149
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
2150
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
2151 2152
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
2153
    }
M
Minghao Li 已提交
2154

M
Minghao Li 已提交
2155
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
2156 2157
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
2158
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
2159 2160
      rpcFreeCont(rpcMsg.pCont);
      return;
2161
    }
M
Minghao Li 已提交
2162

S
Shengliang Guan 已提交
2163
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
2164
  }
M
Minghao Li 已提交
2165 2166
}

M
Minghao Li 已提交
2167
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2168
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2169

M
Minghao Li 已提交
2170 2171
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2172

2173
  if (pNode == NULL) return;
M
Minghao Li 已提交
2174 2175 2176 2177 2178

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2179

2180
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2181 2182 2183 2184
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2185

S
Shengliang Guan 已提交
2186
  SRpcMsg rpcMsg = {0};
2187 2188
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2189

S
Shengliang Guan 已提交
2190
  if (code != 0) {
M
Minghao Li 已提交
2191
    sError("failed to build elect msg");
M
Minghao Li 已提交
2192
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2193
    return;
M
Minghao Li 已提交
2194 2195
  }

S
Shengliang Guan 已提交
2196
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2197
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2198 2199 2200

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2201
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2202
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2203
    syncNodeRelease(pNode);
2204
    return;
M
Minghao Li 已提交
2205
  }
M
Minghao Li 已提交
2206 2207

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2208 2209
}

M
Minghao Li 已提交
2210
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2211
  if (!syncIsInit()) return;
2212

S
Shengliang Guan 已提交
2213 2214 2215 2216
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2217
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2218 2219 2220
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2221
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2222
        return;
2223
      }
M
Minghao Li 已提交
2224

2225
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2226 2227
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2228
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2229 2230
        rpcFreeCont(rpcMsg.pCont);
        return;
2231
      }
S
Shengliang Guan 已提交
2232 2233 2234 2235

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2236
    } else {
S
Shengliang Guan 已提交
2237 2238
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2239
    }
M
Minghao Li 已提交
2240 2241 2242
  }
}

2243
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2244
  int64_t hbDataRid = (int64_t)param;
2245
  int64_t tsNow = taosGetTimestampMs();
2246

2247 2248
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
M
Minghao Li 已提交
2249
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2250 2251
    return;
  }
2252

2253
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2254
  if (pSyncNode == NULL) {
2255
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2256
    sError("hb timer get pSyncNode NULL");
2257 2258 2259 2260 2261 2262 2263 2264
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2265
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2266 2267 2268
    return;
  }

M
Minghao Li 已提交
2269
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2270 2271
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2272
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2273 2274 2275
    return;
  }

M
Minghao Li 已提交
2276
  if (pSyncNode->pRaftStore == NULL) {
2277 2278
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2279
    sError("vgId:%d, hb timer raft store already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2280 2281 2282
    return;
  }

M
Minghao Li 已提交
2283
  // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
2284 2285

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2286 2287 2288
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2289
    if (timerLogicClock == msgLogicClock) {
2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309
      if (tsNow > pData->execTime) {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
            "---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif

        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
        pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
        pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
        pSyncMsg->privateTerm = 0;
2310
        pSyncMsg->timeStamp = tsNow;
2311 2312 2313 2314 2315 2316

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
2317 2318
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2319 2320 2321 2322 2323 2324 2325 2326
      } else {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
      }

M
Minghao Li 已提交
2327 2328
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2329 2330
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2331 2332 2333 2334
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2335
    } else {
M
Minghao Li 已提交
2336 2337
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2338 2339
    }
  }
2340 2341 2342

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2343 2344
}

2345 2346 2347 2348 2349
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
2350

2351 2352 2353 2354
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
2355

S
Shengliang Guan 已提交
2356
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2357
  int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
2358
  syncEntryDestroy(pEntry);
M
Minghao Li 已提交
2359

2360 2361 2362
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2363
    sError("failed to propose noop msg while enqueue since %s", terrstr());
2364
  }
M
Minghao Li 已提交
2365

2366
  return code;
M
Minghao Li 已提交
2367 2368
}

2369 2370
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2371 2372 2373 2374
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2375 2376
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2377 2378 2379 2380 2381 2382 2383 2384 2385
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2386
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
2387 2388 2389 2390 2391 2392 2393
  if (pEntry->dataLen < sizeof(SMsgHead)) {
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
    syncEntryDestroy(pEntry);
    return -1;
  }

B
Benguang Zhao 已提交
2394 2395
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2396 2397 2398 2399 2400
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
    terrno = TSDB_CODE_SYN_BUFFER_FULL;
    (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->pRaftStore->currentTerm, pEntry,
                            TSDB_CODE_SYN_BUFFER_FULL);
    syncEntryDestroy(pEntry);
B
Benguang Zhao 已提交
2401 2402 2403 2404
    return -1;
  }

  // proceed match index, with replicating on needed
2405
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2406

2407
  sTrace("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2408 2409 2410
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2411

B
Benguang Zhao 已提交
2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2428
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440
  if (pSyncNode->replicaNum == 1) {
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2441
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2442 2443 2444 2445 2446 2447 2448 2449 2450
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2470
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2471 2472 2473 2474 2475 2476 2477 2478 2479
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
  SyncTerm  term = ths->pRaftStore->currentTerm;

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2480 2481
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2482 2483 2484
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2485 2486
  int32_t ret = 0;

2487
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2488
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2489
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2490
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2491

2492 2493
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2494
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2495
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2496
    if (code != 0) {
M
Minghao Li 已提交
2497
      sError("append noop error");
2498 2499
      return -1;
    }
2500 2501

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2502 2503
  }

2504 2505 2506
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2507
    syncEntryDestroy(pEntry);
2508 2509
  }

M
Minghao Li 已提交
2510 2511 2512
  return ret;
}

S
Shengliang Guan 已提交
2513 2514
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2515

M
Minghao Li 已提交
2516 2517 2518 2519
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2520
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2521
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2522
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2523

2524 2525 2526 2527
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2528 2529 2530 2531
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number
2532
  pMsgReply->startTime = ths->startTime;
2533
  pMsgReply->timeStamp = tsMs;
2534

M
Minghao Li 已提交
2535
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2536 2537
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);

2538
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2539
    ths->minMatchIndex = pMsg->minMatchIndex;
2540 2541

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2542
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
S
Shengliang Guan 已提交
2543 2544 2545 2546
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2547 2548
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;
2549
      SyncIndex fcIndex = pSyncMsg->fcIndex;
2550 2551 2552 2553 2554 2555 2556

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2557
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2558 2559
        }
      }
2560 2561 2562
    }
  }

M
Minghao Li 已提交
2563
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2564
    // syncNodeStepDown(ths, pMsg->term);
S
Shengliang Guan 已提交
2565 2566 2567 2568
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2569 2570 2571
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

S
Shengliang Guan 已提交
2572 2573
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2574 2575 2576 2577
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2578
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2579
      }
2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594
    }
  }

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
  return 0;
}

2595
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2596 2597 2598 2599
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2600
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2601
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2602 2603 2604 2605
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2606 2607

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2608
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2609

2610 2611
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2612 2613 2614
  return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg);
}

2615
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2616
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2617

M
Minghao Li 已提交
2618 2619 2620 2621
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2622
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2623
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2624
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2625

2626
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2627
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2628 2629 2630
  return 0;
}

S
Shengliang Guan 已提交
2631 2632
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2633 2634
  syncLogRecvLocalCmd(ths, pMsg, "");

2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    (void)syncNodeUpdateCommitIndex(ths, pMsg->fcIndex);
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
      sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(),
             ths->commitIndex);
    }
  } else {
    sError("error local cmd");
  }

  return 0;
}

int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2655 2656 2657
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2658 2659 2660
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2661
  } else {
M
Minghao Li 已提交
2662
    sError("error local cmd");
M
Minghao Li 已提交
2663
  }
2664 2665 2666 2667

  return 0;
}

M
Minghao Li 已提交
2668 2669 2670 2671 2672 2673 2674 2675 2676 2677
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2678

2679
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2680
  sNTrace(ths, "on client request");
2681

B
Benguang Zhao 已提交
2682 2683
  int32_t code = 0;

B
Benguang Zhao 已提交
2684 2685 2686
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = NULL;
2687 2688 2689 2690
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2691 2692
  }

2693 2694 2695 2696 2697
  if (pEntry == NULL) {
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
    return -1;
  }

B
Benguang Zhao 已提交
2698 2699 2700 2701 2702
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2703
    int32_t code = syncNodeAppend(ths, pEntry);
2704 2705
    if (code < 0) {
      sNError(ths, "failed to append blocking msg");
2706 2707
    }
    return code;
2708 2709 2710
  } else {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
B
Benguang Zhao 已提交
2711 2712
  }

B
Benguang Zhao 已提交
2713
  return -1;
B
Benguang Zhao 已提交
2714 2715
}

2716 2717
int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
  sNTrace(ths, "on client request");
B
Benguang Zhao 已提交
2718

M
Minghao Li 已提交
2719
  int32_t ret = 0;
2720
  int32_t code = 0;
M
Minghao Li 已提交
2721

M
Minghao Li 已提交
2722
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2723
  SyncTerm        term = ths->pRaftStore->currentTerm;
2724 2725 2726 2727 2728 2729 2730
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2731

2732 2733
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2734
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2735 2736 2737
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2738 2739 2740 2741
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
2742
          syncEntryDestroy(pEntry);
2743
        }
2744

2745 2746 2747 2748
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2760
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2761 2762 2763 2764

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
2765
          syncEntryDestroy(pEntry);
2766 2767
        }

2768 2769
        return -1;
      }
2770
    }
M
Minghao Li 已提交
2771

2772 2773
    syncCacheEntry(ths->pLogStore, pEntry, &h);

2774 2775
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2776
      syncNodeReplicate(ths);
2777
    }
2778

2779 2780
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2781 2782 2783 2784 2785
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2786
    }
M
Minghao Li 已提交
2787 2788
  }

2789 2790 2791 2792 2793 2794 2795 2796
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2797 2798 2799
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2800
    syncEntryDestroy(pEntry);
2801 2802
  }

M
Minghao Li 已提交
2803
  return ret;
2804
}
M
Minghao Li 已提交
2805

S
Shengliang Guan 已提交
2806 2807 2808
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2809
      return "follower";
S
Shengliang Guan 已提交
2810
    case TAOS_SYNC_STATE_CANDIDATE:
2811
      return "candidate";
S
Shengliang Guan 已提交
2812
    case TAOS_SYNC_STATE_LEADER:
2813
      return "leader";
S
Shengliang Guan 已提交
2814
    case TAOS_SYNC_STATE_ERROR:
2815
      return "error";
S
Shengliang Guan 已提交
2816 2817 2818 2819
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
    default:
      return "unknown";
S
Shengliang Guan 已提交
2820
  }
M
Minghao Li 已提交
2821
}
2822

2823
#if 0
2824
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2825
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2826
    sNTrace(ths, "I am not follower, can not do leader transfer");
2827 2828
    return 0;
  }
2829 2830

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2831
    sNTrace(ths, "restore not finish, can not do leader transfer");
2832 2833 2834
    return 0;
  }

2835
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2836
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2837 2838 2839 2840
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2841
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2842 2843 2844
    return 0;
  }

2845 2846
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2847
      sNTrace(ths, "I am vnode, can not do leader transfer");
2848 2849 2850 2851
      return 0;
    }
  */

2852
  SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
S
Shengliang Guan 已提交
2853
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2854

M
Minghao Li 已提交
2855 2856 2857
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2858

M
Minghao Li 已提交
2859 2860
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2861 2862 2863
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
2864
    ASSERT(ret == 0);
M
Minghao Li 已提交
2865

2866
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2867
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2868 2869
  }

M
Minghao Li 已提交
2870
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2871
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2872 2873 2874 2875 2876 2877 2878 2879 2880
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2881 2882
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2883 2884
  }

2885 2886 2887
  return 0;
}

2888 2889
#endif

2890
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2891
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2905 2906 2907 2908
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2909
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2910
  ASSERT(false);
2911 2912 2913 2914
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2915 2916 2917 2918 2919 2920 2921 2922 2923
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2924
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2925

M
Minghao Li 已提交
2926 2927 2928
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2929 2930
  }

2931 2932
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2933

S
Shengliang Guan 已提交
2934
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2935 2936 2937 2938 2939 2940

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2941 2942 2943 2944
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2945

2946
          ths->pLogStore->cacheHit++;
2947 2948
          sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry);

2949
        } else {
2950
          ths->pLogStore->cacheMiss++;
2951 2952
          sNTrace(ths, "miss cache index:%" PRId64, i);

2953
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
2954 2955
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2956
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2957
            sNError(ths, "get log entry error");
2958
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2959 2960
            continue;
          }
2961
        }
2962

2963
        SRpcMsg rpcMsg = {0};
2964 2965
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2966
        sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType));
M
Minghao Li 已提交
2967

2968
        // user commit
2969 2970
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2971
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2972 2973 2974
            internalExecute = false;
          }

M
Minghao Li 已提交
2975 2976
          sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute,
                  TMSG_INFO(pEntry->msgType));
2977

2978 2979
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2992
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2993
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
M
Minghao Li 已提交
2994
          }
2995
        }
2996

2997 2998
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2999
        // leader transfer
3000 3001
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
3002
          ASSERT(code == 0);
3003
        }
3004
#endif
3005 3006

        // restore finish
3007
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
3008 3009 3010 3011 3012 3013
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
3014

3015
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
3016
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
3017 3018 3019 3020
          }
        }

        rpcFreeCont(rpcMsg.pCont);
3021 3022 3023
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
B
Benguang Zhao 已提交
3024
          syncEntryDestroy(pEntry);
3025
        }
3026 3027 3028 3029
      }
    }
  }
  return 0;
3030 3031 3032
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
3033
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
3034 3035 3036 3037 3038
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
3039 3040 3041 3042
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
3043
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
3044 3045 3046 3047 3048
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
3049
}
M
Minghao Li 已提交
3050

3051 3052
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
3053
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
3054 3055 3056 3057 3058 3059 3060
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
3061 3062
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
3063
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
3064 3065 3066 3067 3068 3069 3070 3071 3072
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
3073
  if (pState == NULL) {
3074
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
3075 3076
    return false;
  }
M
Minghao Li 已提交
3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
3102
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
3103
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
3104
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
3105 3106 3107 3108 3109 3110
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
3111
}