syncMain.c 89.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
26
#include "syncPipeline.h"
M
Minghao Li 已提交
27
#include "syncRaftCfg.h"
M
Minghao Li 已提交
28
#include "syncRaftLog.h"
M
Minghao Li 已提交
29
#include "syncRaftStore.h"
M
Minghao Li 已提交
30
#include "syncReplication.h"
M
Minghao Li 已提交
31 32
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
33
#include "syncRespMgr.h"
M
Minghao Li 已提交
34
#include "syncSnapshot.h"
M
Minghao Li 已提交
35
#include "syncTimeout.h"
M
Minghao Li 已提交
36
#include "syncUtil.h"
M
Minghao Li 已提交
37
#include "syncVoteMgr.h"
38
#include "tglobal.h"
M
Minghao Li 已提交
39
#include "tref.h"
M
Minghao Li 已提交
40

M
Minghao Li 已提交
41 42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
46
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
47
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
48 49 50
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
51 52 53 54 55 56 57 58 59 60 61
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
M
Minghao Li 已提交
62

63
int64_t syncOpen(SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
64
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
65
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
66
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
67 68
    return -1;
  }
M
Minghao Li 已提交
69

S
Shengliang Guan 已提交
70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
M
Minghao Li 已提交
71
  if (pSyncNode->rid < 0) {
72
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
73 74 75
    return -1;
  }

S
Shengliang Guan 已提交
76 77 78 79 80 81
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
82
  return pSyncNode->rid;
M
Minghao Li 已提交
83
}
M
Minghao Li 已提交
84

B
Benguang Zhao 已提交
85
int32_t syncStart(int64_t rid) {
S
Shengliang Guan 已提交
86
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
87
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
88
    sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
B
Benguang Zhao 已提交
89 90 91 92
    return -1;
  }

  if (syncNodeRestore(pSyncNode) < 0) {
93
    sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr());
94
    goto _err;
M
Minghao Li 已提交
95
  }
M
Minghao Li 已提交
96

B
Benguang Zhao 已提交
97 98 99 100
  if (syncNodeStart(pSyncNode) < 0) {
    sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
    goto _err;
  }
M
Minghao Li 已提交
101

B
Benguang Zhao 已提交
102 103
  syncNodeRelease(pSyncNode);
  return 0;
M
Minghao Li 已提交
104

105 106 107
_err:
  syncNodeRelease(pSyncNode);
  return -1;
M
Minghao Li 已提交
108 109
}

M
Minghao Li 已提交
110
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
111
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
112
  if (pSyncNode != NULL) {
113
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
114
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
115
    syncNodeRemove(rid);
M
Minghao Li 已提交
116 117 118
  }
}

M
Minghao Li 已提交
119 120
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
121 122 123
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
124 125 126
  }
}

127 128 129 130 131 132 133 134
void syncPostStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodePostClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
}

S
Shengliang Guan 已提交
135 136 137
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
138 139
}

S
Shengliang Guan 已提交
140
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
141
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
142
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
143

M
Minghao Li 已提交
144
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
145
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
146
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
147
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
148
    return -1;
M
Minghao Li 已提交
149
  }
150

S
Shengliang Guan 已提交
151 152
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
153

M
Minghao Li 已提交
154 155 156 157
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
158
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
159 160 161
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
S
Shengliang Guan 已提交
162
    // syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
163
  }
S
Shengliang Guan 已提交
164

S
Shengliang Guan 已提交
165
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
166
  return 0;
M
Minghao Li 已提交
167
}
M
Minghao Li 已提交
168

S
Shengliang Guan 已提交
169 170 171 172
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
173
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
174 175
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
S
Shengliang Guan 已提交
205
      code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
S
Shengliang Guan 已提交
206 207 208 209 210
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
    default:
211
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
212
      code = -1;
M
Minghao Li 已提交
213 214
  }

S
Shengliang Guan 已提交
215
  syncNodeRelease(pSyncNode);
216 217 218 219
  if (code != 0) {
    sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
           terrno);
  }
S
Shengliang Guan 已提交
220
  return code;
221 222
}

S
Shengliang Guan 已提交
223
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
224
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
225
  if (pSyncNode == NULL) return -1;
226

S
Shengliang Guan 已提交
227
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
228
  syncNodeRelease(pSyncNode);
229 230 231
  return ret;
}

232
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
S
Shengliang Guan 已提交
233
  SSyncNode* pNode = syncNodeAcquire(rid);
234
  if (pNode == NULL) return -1;
S
Shengliang Guan 已提交
235 236

  SRpcMsg rpcMsg = {0};
237
  int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info);
S
Shengliang Guan 已提交
238 239 240
  rpcMsg.code = TSDB_CODE_SYN_TIMEOUT;

  syncNodeRelease(pNode);
241
  if (ret == 1) {
242
    sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
243
    rpcSendResponse(&rpcMsg);
244 245
    return 0;
  } else {
246
    sError("no message handle to send timeout response, seq:%" PRId64, seq);
247
    return -1;
248
  }
S
Shengliang Guan 已提交
249 250
}

M
Minghao Li 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

267
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
268
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
269
  if (pSyncNode == NULL) {
270
    sError("sync begin snapshot error");
271 272
    return -1;
  }
273

274 275
  int32_t code = 0;

M
Minghao Li 已提交
276
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
277 278 279
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
280 281 282
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
283 284 285
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
286 287
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
288
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
289 290 291
      return 0;
    }

M
Minghao Li 已提交
292 293 294
    goto _DEL_WAL;

  } else {
295 296 297 298 299 300 301 302 303 304
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
      syncNodeRelease(pSyncNode);
      return 0;
    }

M
Minghao Li 已提交
305 306 307 308
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

309
      lastApplyIndex = TMAX(lastApplyIndex - SYNC_VNODE_LOG_RETENTION, beginIndex - 1);
310

M
Minghao Li 已提交
311 312 313 314 315 316
      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
317 318 319 320
            sNTrace(pSyncNode,
                    "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                    " of dnode:%d, do not delete wal",
                    lastApplyIndex, matchIndex, DID(&pSyncNode->peersId[i]));
M
Minghao Li 已提交
321

S
Shengliang Guan 已提交
322
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
323 324 325 326 327 328
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
329 330 331
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
332
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
333 334 335 336
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
337
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
338
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
339 340 341
        return 0;

      } else {
S
Shengliang Guan 已提交
342
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
343
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
344 345 346 347 348 349 350 351 352
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
353 354 355
    }
  }

M
Minghao Li 已提交
356
_DEL_WAL:
357

M
Minghao Li 已提交
358
  do {
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

        code = walBeginSnapshot(pData->pWal, lastApplyIndex);
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
379

M
Minghao Li 已提交
380
      } else {
381 382
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
383
      }
384
    }
M
Minghao Li 已提交
385
  } while (0);
386

S
Shengliang Guan 已提交
387
  syncNodeRelease(pSyncNode);
388 389 390 391
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
392
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
393
  if (pSyncNode == NULL) {
394
    sError("sync end snapshot error");
395 396 397
    return -1;
  }

398 399 400 401
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
402
    if (code != 0) {
403
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
404
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
405 406
      return -1;
    } else {
S
Shengliang Guan 已提交
407
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
408 409
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
410
  }
411

S
Shengliang Guan 已提交
412
  syncNodeRelease(pSyncNode);
413 414 415
  return code;
}

M
Minghao Li 已提交
416
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
417
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
418
  if (pSyncNode == NULL) {
419
    sError("sync step down error");
M
Minghao Li 已提交
420 421 422
    return -1;
  }

M
Minghao Li 已提交
423
  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
424
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
425
  return 0;
M
Minghao Li 已提交
426 427
}

428
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
429
  if (pSyncNode == NULL) {
430
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
431
    sError("sync ready for read error");
432 433
    return false;
  }
M
Minghao Li 已提交
434

435 436 437 438 439 440
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

  if (pSyncNode->restoreFinish) {
441
    return true;
M
Minghao Li 已提交
442 443
  }

444
  bool ready = false;
445 446 447
  if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
    // apply queue not empty
    ready = false;
M
Minghao Li 已提交
448

449 450 451 452 453 454 455 456 457 458 459 460 461
  } else {
    if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
      SyncIndex       lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
      SSyncRaftEntry* pEntry = NULL;
      SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
      LRUHandle*      h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
      int32_t         code = 0;
      if (h) {
        pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        code = 0;

        pSyncNode->pLogStore->cacheHit++;
        sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
M
Minghao Li 已提交
462

463 464 465
      } else {
        pSyncNode->pLogStore->cacheMiss++;
        sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
M
Minghao Li 已提交
466

467 468
        code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
      }
469

470
      if (code == 0 && pEntry != NULL) {
S
Shengliang Guan 已提交
471
        if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->raftStore.currentTerm) {
472
          ready = true;
473
        }
474

475 476 477 478
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestroy(pEntry);
479
        }
480 481 482 483
      }
    }
  }

484
  if (!ready) {
485
    terrno = TSDB_CODE_SYN_RESTORING;
486
  }
487

488 489 490 491 492 493 494 495 496 497 498 499
  return ready;
}

bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    sError("sync ready for read error");
    return false;
  }

  bool ready = syncNodeIsReadyForRead(pSyncNode);

500 501
  syncNodeRelease(pSyncNode);
  return ready;
M
Minghao Li 已提交
502
}
M
Minghao Li 已提交
503

504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
526 527
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
528
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
529 530
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
531
  }
M
Minghao Li 已提交
532

533
  int32_t ret = 0;
534
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
535
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
536 537 538 539 540 541 542
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
543 544 545
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
546
  return ret;
M
Minghao Li 已提交
547 548
}

M
Minghao Li 已提交
549 550
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
551
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
552 553
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
M
Minghao Li 已提交
554
  }
555

S
Shengliang Guan 已提交
556
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
557

558 559 560 561
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
562
  pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader);
M
Minghao Li 已提交
563 564 565
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
566
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL);
S
Shengliang Guan 已提交
567 568
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
569 570
}

571 572
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
573

S
Shengliang Guan 已提交
574
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
575 576 577
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
578 579 580 581 582
    if (pSyncNode->vgId != 1) {
      state.canRead = syncNodeIsReadyForRead(pSyncNode);
    } else {
      state.canRead = state.restored;
    }
583
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
584 585
  }

586
  return state;
M
Minghao Li 已提交
587 588
}

589
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
590 591
  ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
592

593 594 595 596
  for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
    if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
        (pSyncNode->raftCfg.configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
597 598
    }
  }
S
Shengliang Guan 已提交
599
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
600
         snapshotLastApplyIndex, lastIndex);
601 602 603 604

  return lastIndex;
}

605 606
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
607

S
Shengliang Guan 已提交
608
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
609
  if (pSyncNode == NULL) return;
M
Minghao Li 已提交
610

611
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
612
    SEp* pEp = &pEpSet->eps[i];
613 614
    tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
S
Shengliang Guan 已提交
615
    pEpSet->numOfEps++;
616
    sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
M
Minghao Li 已提交
617
  }
M
Minghao Li 已提交
618
  if (pEpSet->numOfEps > 0) {
619
    pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
M
Minghao Li 已提交
620 621
  }

S
Shengliang Guan 已提交
622
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
623
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
624 625
}

S
Shengliang Guan 已提交
626
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
627
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
628
  if (pSyncNode == NULL) {
629
    sError("sync propose error");
M
Minghao Li 已提交
630
    return -1;
631
  }
632

S
Shengliang Guan 已提交
633
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq);
S
Shengliang Guan 已提交
634
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
635 636
  return ret;
}
M
Minghao Li 已提交
637

S
Shengliang Guan 已提交
638
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
S
Shengliang Guan 已提交
639 640
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
S
Shengliang Guan 已提交
641
    sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
642 643
    return -1;
  }
644

S
Shengliang Guan 已提交
645 646 647 648 649 650 651
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }
652

653
  // heartbeat timeout
654
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
655 656 657 658 659 660
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
661 662 663
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
664
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
665 666
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
S
Shengliang Guan 已提交
667
      pMsg->info.conn.applyTerm = pSyncNode->raftStore.currentTerm;
668 669 670
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
M
Minghao Li 已提交
671
    } else {
S
Shengliang Guan 已提交
672
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
673
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
674
             TMSG_INFO(pMsg->msgType));
675
      return -1;
676
    }
S
Shengliang Guan 已提交
677
  } else {
S
Shengliang Guan 已提交
678 679
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
680
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
681
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
682 683 684 685
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
M
Minghao Li 已提交
686
    }
687

688 689 690 691 692
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
693
    }
M
Minghao Li 已提交
694

S
Shengliang Guan 已提交
695
    if (seq != NULL) *seq = seqNum;
696
    return code;
M
Minghao Li 已提交
697
  }
M
Minghao Li 已提交
698 699
}

S
Shengliang Guan 已提交
700
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
701 702 703 704 705
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
706
  pSyncTimer->timeStamp = taosGetTimestampMs();
707 708 709 710
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
711
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
712
  int32_t ret = 0;
S
Shengliang Guan 已提交
713
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
714
  if (syncIsInit()) {
715 716 717 718 719 720
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
721
    pSyncTimer->timeStamp = tsNow;
722 723

    pData->syncNodeRid = pSyncNode->rid;
724 725 726
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
727
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
728

729 730
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
731 732 733 734 735 736
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
737
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
738 739 740 741
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
742 743
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
744 745 746
  return ret;
}

747
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
S
Shengliang Guan 已提交
748 749 750
  ASSERTS(pNode->pLogStore != NULL, "log store not created");
  ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
  ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
751 752 753
  SSnapshot snapshot = {0};
  pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);

754 755 756 757 758
  SyncIndex commitIndex = snapshot.lastApplyIndex;
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
  if (lastVer < commitIndex || firstVer > commitIndex + 1) {
    if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
S
Shengliang Guan 已提交
759
      sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64,
760 761 762 763 764 765 766
             pNode->vgId, terrstr(), lastVer, commitIndex);
      return -1;
    }
  }
  return 0;
}

M
Minghao Li 已提交
767
// open/close --------------
S
Shengliang Guan 已提交
768 769
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
770 771 772 773
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
774

M
Minghao Li 已提交
775 776 777 778
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
779
      goto _error;
M
Minghao Li 已提交
780
    }
781
  }
M
Minghao Li 已提交
782

783 784 785
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
S
Shengliang Guan 已提交
786
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
787

788
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
789
    // create a new raft config file
790
    sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
791 792 793 794 795 796 797 798 799 800
    pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
    pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
    pSyncNode->raftCfg.lastConfigIndex = SYNC_INDEX_INVALID;
    pSyncNode->raftCfg.batchSize = pSyncInfo->batchSize;
    pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
    pSyncNode->raftCfg.configIndexCount = 1;
    pSyncNode->raftCfg.configIndexArr[0] = -1;

    if (syncWriteCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
801
      goto _error;
802
    }
803 804
  } else {
    // update syncCfg by raft_config.json
805 806
    if (syncReadCfgFile(pSyncNode) != 0) {
      sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId);
H
Hongze Cheng 已提交
807
      goto _error;
808
    }
S
Shengliang Guan 已提交
809

810
    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
811
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
812 813 814
      pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
      if (syncWriteCfgFile(pSyncNode) != 0) {
        sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
S
Shengliang Guan 已提交
815 816
        goto _error;
      }
S
Shengliang Guan 已提交
817
    } else {
818 819
      sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
820
    }
M
Minghao Li 已提交
821 822
  }

M
Minghao Li 已提交
823
  // init by SSyncInfo
M
Minghao Li 已提交
824
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
825
  SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
S
Shengliang Guan 已提交
826
  sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
S
Shengliang Guan 已提交
827 828
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
829
    tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
830 831
    sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
832 833
  }

M
Minghao Li 已提交
834
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
835
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
836 837 838
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
839

B
Benguang Zhao 已提交
840 841 842
  // create raft log ring buffer
  pSyncNode->pLogBuf = syncLogBufferCreate();
  if (pSyncNode->pLogBuf == NULL) {
843
    sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
B
Benguang Zhao 已提交
844 845 846
    goto _error;
  }

M
Minghao Li 已提交
847
  // init internal
848
  pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
849
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
850
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
851
    goto _error;
852
  }
M
Minghao Li 已提交
853

M
Minghao Li 已提交
854
  // init peersNum, peers, peersId
855
  pSyncNode->peersNum = pSyncNode->raftCfg.cfg.replicaNum - 1;
S
Shengliang Guan 已提交
856
  int32_t j = 0;
857 858 859 860
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
    if (i != pSyncNode->raftCfg.cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
      syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
861 862 863
      j++;
    }
  }
S
Shengliang Guan 已提交
864
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
865
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
866
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
867
      goto _error;
868
    }
M
Minghao Li 已提交
869
  }
M
Minghao Li 已提交
870

M
Minghao Li 已提交
871
  // init replicaNum, replicasId
872 873 874
  pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
  for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
875
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
876
      goto _error;
877
    }
M
Minghao Li 已提交
878 879
  }

M
Minghao Li 已提交
880
  // init raft algorithm
M
Minghao Li 已提交
881
  pSyncNode->pFsm = pSyncInfo->pFsm;
882
  pSyncInfo->pFsm = NULL;
883
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
M
Minghao Li 已提交
884 885
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
886
  // init life cycle outside
M
Minghao Li 已提交
887

M
Minghao Li 已提交
888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
912
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
913
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
S
Shengliang Guan 已提交
914
  if (raftStoreReadFile(pSyncNode) != 0) {
S
Shengliang Guan 已提交
915
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
916 917
    goto _error;
  }
M
Minghao Li 已提交
918

M
Minghao Li 已提交
919
  // init TLA+ candidate vars
M
Minghao Li 已提交
920
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
921
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
922
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
923 924
    goto _error;
  }
M
Minghao Li 已提交
925
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
926
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
927
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
928 929
    goto _error;
  }
M
Minghao Li 已提交
930

M
Minghao Li 已提交
931 932
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
933
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
934
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
935 936
    goto _error;
  }
M
Minghao Li 已提交
937
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
938
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
939
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
940 941
    goto _error;
  }
M
Minghao Li 已提交
942 943 944

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
945
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
946
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
947 948
    goto _error;
  }
949 950 951 952

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
953
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
954 955
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
956
      sNTrace(pSyncNode, "reset commit index by snapshot");
957 958 959
    }
  }
  pSyncNode->commitIndex = commitIndex;
960
  sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
M
Minghao Li 已提交
961

962
  // restore log store on need
963
  if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
964
    sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
965 966
    goto _error;
  }
967

M
Minghao Li 已提交
968 969
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
970 971
  pSyncNode->electBaseLine = tsElectInterval;
  pSyncNode->hbBaseLine = tsHeartbeatInterval;
M
Minghao Li 已提交
972

M
Minghao Li 已提交
973
  // init ping timer
M
Minghao Li 已提交
974
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
975
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
976 977
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
978
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
979
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
980

M
Minghao Li 已提交
981 982
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
983
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
984
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
985
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
986 987 988 989
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
990
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
991 992
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
993
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
994 995
  pSyncNode->heartbeatTimerCounter = 0;

996 997 998 999 1000
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
1001
  // tools
M
Minghao Li 已提交
1002
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
1003
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
1004
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
1005 1006
    goto _error;
  }
M
Minghao Li 已提交
1007

1008 1009
  // restore state
  pSyncNode->restoreFinish = false;
1010

M
Minghao Li 已提交
1011
  // snapshot senders
S
Shengliang Guan 已提交
1012
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1013
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
1014 1015 1016 1017
    if (pSender == NULL) return NULL;

    pSyncNode->senders[i] = pSender;
    sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
M
Minghao Li 已提交
1018 1019 1020
  }

  // snapshot receivers
1021
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
1022 1023 1024
  if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
  sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
          pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1025

M
Minghao Li 已提交
1026 1027 1028
  // is config changing
  pSyncNode->changing = false;

B
Benguang Zhao 已提交
1029
  // replication mgr
1030 1031 1032 1033
  if (syncNodeLogReplMgrInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
B
Benguang Zhao 已提交
1034

M
Minghao Li 已提交
1035
  // peer state
1036 1037 1038 1039
  if (syncNodePeerStateInit(pSyncNode) < 0) {
    sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
    goto _error;
  }
M
Minghao Li 已提交
1040

B
Benguang Zhao 已提交
1041
  //
M
Minghao Li 已提交
1042 1043 1044
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1045
  // start in syncNodeStart
M
Minghao Li 已提交
1046
  // start raft
M
Minghao Li 已提交
1047
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1048

M
Minghao Li 已提交
1049 1050
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1051
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1052 1053
  pSyncNode->lastReplicateTime = timeNow;

1054 1055 1056
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

B
Benguang Zhao 已提交
1057 1058
  // init log buffer
  if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
1059
    sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr());
1060
    goto _error;
B
Benguang Zhao 已提交
1061 1062
  }

1063
  pSyncNode->isStart = true;
1064 1065 1066
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1067 1068
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1069
  pSyncNode->tmrRoutineNum = 0;
1070

1071 1072
  sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
         tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
M
Minghao Li 已提交
1073
  return pSyncNode;
1074 1075 1076

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1077 1078
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1079 1080 1081 1082
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1083 1084
}

M
Minghao Li 已提交
1085 1086
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
1087 1088
    SSnapshot snapshot = {0};
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1089 1090 1091 1092 1093 1094
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

B
Benguang Zhao 已提交
1095
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1096 1097
  ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
  ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
B
Benguang Zhao 已提交
1098 1099 1100 1101

  SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
  SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
1102 1103
  if (lastVer != -1 && endIndex != lastVer + 1) {
    terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
S
Shengliang Guan 已提交
1104
    sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
1105 1106 1107
           pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
    return -1;
  }
B
Benguang Zhao 已提交
1108

1109
  ASSERT(endIndex == lastVer + 1);
1110 1111
  pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
  sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
B
Benguang Zhao 已提交
1112

1113
  if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
B
Benguang Zhao 已提交
1114 1115 1116 1117 1118 1119 1120 1121 1122
    return -1;
  }

  return 0;
}

int32_t syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
1123
    raftStoreNextTerm(pSyncNode);
B
Benguang Zhao 已提交
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133
    syncNodeBecomeLeader(pSyncNode, "one replica start");

    // Raft 3.6.2 Committing entries from previous terms
    syncNodeAppendNoop(pSyncNode);
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
  }

  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
1134 1135
  if (ret != 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
1136
  }
1137
  return ret;
M
Minghao Li 已提交
1138 1139
}

B
Benguang Zhao 已提交
1140
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1141 1142 1143 1144 1145 1146 1147
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1148 1149 1150 1151
  if (ret < 0) {
    sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
1152

1153
  ret = syncNodeStartPingTimer(pSyncNode);
1154 1155 1156 1157
  if (ret < 0) {
    sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
    return -1;
  }
B
Benguang Zhao 已提交
1158
  return ret;
M
Minghao Li 已提交
1159 1160
}

M
Minghao Li 已提交
1161
void syncNodePreClose(SSyncNode* pSyncNode) {
1162 1163 1164 1165
  if (pSyncNode != NULL && pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpApplyQueueItems != NULL) {
    while (1) {
      int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
      sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
1166
      if (aqItems == 0 || aqItems == -1) {
1167 1168 1169 1170 1171 1172
        break;
      }
      taosMsleep(20);
    }
  }

1173
#if 0
1174 1175
  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1176
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1177 1178
    }

1179 1180
    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
1181 1182 1183
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }
1184
#endif
1185

M
Minghao Li 已提交
1186 1187 1188 1189 1190
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
1191 1192 1193

  // clean rsp
  syncRespCleanRsp(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
1194 1195
}

1196 1197 1198
void syncNodePostClose(SSyncNode* pSyncNode) {
  if (pSyncNode->pNewNodeReceiver != NULL) {
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1199
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1200 1201 1202 1203 1204 1205 1206
    }

    sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
           pSyncNode->pNewNodeReceiver);
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }
M
Minghao Li 已提交
1207 1208
}

1209
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
M
Minghao Li 已提交
1210

M
Minghao Li 已提交
1211
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1212
  if (pSyncNode == NULL) return;
1213
  sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
M
Minghao Li 已提交
1214

B
Benguang Zhao 已提交
1215
  syncNodeLogReplMgrDestroy(pSyncNode);
M
Minghao Li 已提交
1216
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1217
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1218
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1219
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1220
  votesRespondDestory(pSyncNode->pVotesRespond);
1221
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1222
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1223
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1224
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1225
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1226
  logStoreDestory(pSyncNode->pLogStore);
1227
  pSyncNode->pLogStore = NULL;
B
Benguang Zhao 已提交
1228 1229
  syncLogBufferDestroy(pSyncNode->pLogBuf);
  pSyncNode->pLogBuf = NULL;
M
Minghao Li 已提交
1230 1231 1232 1233 1234

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

S
Shengliang Guan 已提交
1235
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1236 1237
    if (pSyncNode->senders[i] != NULL) {
      sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
1238

1239 1240
      if (snapshotSenderIsStart(pSyncNode->senders[i])) {
        snapshotSenderStop(pSyncNode->senders[i], false);
1241 1242
      }

1243 1244
      snapshotSenderDestroy(pSyncNode->senders[i]);
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1245 1246 1247
    }
  }

M
Minghao Li 已提交
1248
  if (pSyncNode->pNewNodeReceiver != NULL) {
1249
    if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1250
      snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1251 1252
    }

1253
    sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
M
Minghao Li 已提交
1254 1255 1256 1257
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1258 1259 1260 1261
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

1262
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1263 1264
}

1265
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg.snapshotStrategy; }
M
Minghao Li 已提交
1266

M
Minghao Li 已提交
1267 1268 1269
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1270 1271
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1272 1273 1274
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1275
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1276
  }
M
Minghao Li 已提交
1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1290
  if (syncIsInit()) {
1291
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1292

1293 1294 1295 1296 1297
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1298

M
Minghao Li 已提交
1299
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1300
                 &pSyncNode->pElectTimer);
1301

1302
  } else {
M
Minghao Li 已提交
1303
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1304
  }
M
Minghao Li 已提交
1305 1306 1307 1308 1309
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1310
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1311 1312
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1313

M
Minghao Li 已提交
1314 1315 1316 1317 1318 1319 1320 1321 1322 1323
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

1324
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1325 1326
  int32_t electMS;

1327
  if (pSyncNode->raftCfg.isStandBy) {
M
Minghao Li 已提交
1328 1329 1330 1331
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
1332 1333

  (void)syncNodeRestartElectTimer(pSyncNode, electMS);
1334

S
Shengliang Guan 已提交
1335 1336
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1337 1338
}

M
Minghao Li 已提交
1339
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1340
  int32_t ret = 0;
S
Shengliang Guan 已提交
1341 1342
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1343 1344 1345
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1346
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1347
  }
1348

S
Shengliang Guan 已提交
1349
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1350 1351 1352
  return ret;
}

M
Minghao Li 已提交
1353
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1354
  int32_t ret = 0;
M
Minghao Li 已提交
1355

1356
#if 0
M
Minghao Li 已提交
1357
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1358 1359
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1360

S
Shengliang Guan 已提交
1361
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1362
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1363 1364 1365
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1366
  }
1367

M
Minghao Li 已提交
1368 1369 1370
  return ret;
}

M
Minghao Li 已提交
1371 1372
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1373 1374

#if 0
M
Minghao Li 已提交
1375 1376 1377
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1378
#endif
1379

S
Shengliang Guan 已提交
1380
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1381
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1382 1383 1384
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1385
  }
1386

M
Minghao Li 已提交
1387 1388 1389
  return ret;
}

1390 1391 1392 1393 1394 1395
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

1396 1397 1398 1399 1400 1401 1402 1403
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
  SEpSet* epSet = NULL;
  for (int32_t i = 0; i < pNode->peersNum; ++i) {
    if (destRaftId->addr == pNode->peersId[i].addr) {
      epSet = &pNode->peersEpset[i];
      break;
    }
  }
1404

S
Shengliang Guan 已提交
1405
  int32_t code = -1;
1406
  if (pNode->syncSendMSg != NULL && epSet != NULL) {
M
Minghao Li 已提交
1407
    syncUtilMsgHtoN(pMsg->pCont);
1408
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1409 1410 1411 1412 1413 1414
    code = pNode->syncSendMSg(epSet, pMsg);
  }

  if (code < 0) {
    sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet,
           DID(destRaftId), destRaftId->addr, terrno);
S
Shengliang Guan 已提交
1415
    rpcFreeCont(pMsg->pCont);
1416
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
M
Minghao Li 已提交
1417
  }
S
Shengliang Guan 已提交
1418 1419

  return code;
M
Minghao Li 已提交
1420 1421
}

1422
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
1423 1424 1425
  bool b1 = false;
  bool b2 = false;

1426 1427 1428
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    if (strcmp(pCfg->nodeInfo[i].nodeFqdn, pNode->myNodeInfo.nodeFqdn) == 0 &&
        pCfg->nodeInfo[i].nodePort == pNode->myNodeInfo.nodePort) {
1429 1430 1431 1432 1433
      b1 = true;
      break;
    }
  }

1434 1435 1436 1437 1438
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pCfg->nodeInfo[i]),
        .vgId = pNode->vgId,
    };
1439

1440
    if (syncUtilSameId(&raftId, &pNode->myRaftId)) {
1441 1442 1443 1444 1445
      b2 = true;
      break;
    }
  }

1446
  ASSERT(b1 == b2);
1447 1448 1449
  return b1;
}

1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1463
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1464
  SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
1465 1466 1467 1468
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1469

1470 1471
  pSyncNode->raftCfg.cfg = *pNewConfig;
  pSyncNode->raftCfg.lastConfigIndex = lastConfigChangeIndex;
1472

1473 1474
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1475 1476
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1477

M
Minghao Li 已提交
1478 1479
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1480

M
Minghao Li 已提交
1481 1482 1483 1484
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1485
  }
1486

M
Minghao Li 已提交
1487 1488 1489 1490 1491
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1492

M
Minghao Li 已提交
1493
  // log begin config change
1494 1495
  sNInfo(pSyncNode, "begin do config change, from %d to %d", pSyncNode->vgId, oldConfig.replicaNum,
         pNewConfig->replicaNum);
M
Minghao Li 已提交
1496

M
Minghao Li 已提交
1497
  if (IamInNew) {
1498
    pSyncNode->raftCfg.isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1499
  }
M
Minghao Li 已提交
1500
  if (isDrop) {
1501
    pSyncNode->raftCfg.isStandBy = 1;  // set standby
M
Minghao Li 已提交
1502 1503
  }

M
Minghao Li 已提交
1504
  // add last config index
1505
  syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
M
Minghao Li 已提交
1506

M
Minghao Li 已提交
1507 1508 1509 1510 1511 1512 1513 1514 1515
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1516
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1517
      oldSenders[i] = pSyncNode->senders[i];
S
Shengliang Guan 已提交
1518
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1519
    }
1520

M
Minghao Li 已提交
1521
    // init internal
1522
    pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
1523
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1524 1525

    // init peersNum, peers, peersId
1526
    pSyncNode->peersNum = pSyncNode->raftCfg.cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1527
    int32_t j = 0;
1528 1529 1530 1531
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
      if (i != pSyncNode->raftCfg.cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
        syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
M
Minghao Li 已提交
1532 1533 1534
        j++;
      }
    }
S
Shengliang Guan 已提交
1535
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1536
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1537
    }
1538

M
Minghao Li 已提交
1539
    // init replicaNum, replicasId
1540 1541 1542
    pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
    for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
      syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1543
    }
1544

1545
    // update quorum first
1546
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
1547

M
Minghao Li 已提交
1548 1549 1550 1551
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1552

M
Minghao Li 已提交
1553
    // reset snapshot senders
1554

M
Minghao Li 已提交
1555
    // clear new
S
Shengliang Guan 已提交
1556
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1557
      pSyncNode->senders[i] = NULL;
M
Minghao Li 已提交
1558
    }
M
Minghao Li 已提交
1559

M
Minghao Li 已提交
1560
    // reset new
S
Shengliang Guan 已提交
1561
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1562 1563
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1564
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1565
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
1566 1567
          sNTrace(pSyncNode, "snapshot sender reset for:%" PRId64 ", newIndex:%d, dnode:%d, %p",
                  (pSyncNode->replicasId)[i].addr, i, DID(&pSyncNode->replicasId[i]), oldSenders[j]);
M
Minghao Li 已提交
1568

1569
          pSyncNode->senders[i] = oldSenders[j];
M
Minghao Li 已提交
1570 1571 1572 1573
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
1574 1575
          int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
          pSyncNode->senders[i]->replicaIndex = i;
M
Minghao Li 已提交
1576

1577 1578
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, dnode:%d, %p, reset:%d",
                  oldreplicaIndex, i, DID(&pSyncNode->replicasId[i]), pSyncNode->senders[i], reset);
M
Minghao Li 已提交
1579 1580

          break;
M
Minghao Li 已提交
1581
        }
1582 1583
      }
    }
1584

M
Minghao Li 已提交
1585
    // create new
S
Shengliang Guan 已提交
1586
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
1587 1588 1589 1590 1591 1592 1593 1594
      if (pSyncNode->senders[i] == NULL) {
        pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
        if (pSyncNode->senders[i] == NULL) {
          // will be created later while send snapshot
          sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
        } else {
          sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
        }
S
Shengliang Guan 已提交
1595
      } else {
1596
        sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
M
Minghao Li 已提交
1597
      }
1598 1599
    }

M
Minghao Li 已提交
1600
    // free old
S
Shengliang Guan 已提交
1601
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1602
      if (oldSenders[i] != NULL) {
1603
        sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1604 1605 1606
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1607 1608
    }

1609
    // persist cfg
1610
    syncWriteCfgFile(pSyncNode);
1611

M
Minghao Li 已提交
1612 1613
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
1614
      syncNodeBecomeLeader(pSyncNode, "");
1615 1616 1617

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
1618
      // syncMaybeAdvanceCommitIndex(pSyncNode);
1619

M
Minghao Li 已提交
1620
    } else {
1621
      syncNodeBecomeFollower(pSyncNode, "");
M
Minghao Li 已提交
1622 1623
    }
  } else {
1624
    // persist cfg
1625 1626
    syncWriteCfgFile(pSyncNode);
    sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.replicaNum, pNewConfig->replicaNum);
1627
  }
1628

M
Minghao Li 已提交
1629
_END:
M
Minghao Li 已提交
1630
  // log end config change
S
Shengliang Guan 已提交
1631
  sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.replicaNum, pNewConfig->replicaNum);
M
Minghao Li 已提交
1632 1633
}

M
Minghao Li 已提交
1634 1635
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
S
Shengliang Guan 已提交
1636 1637
  if (term > pSyncNode->raftStore.currentTerm) {
    raftStoreSetTerm(pSyncNode, term);
1638
    char tmpBuf[64];
1639
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1640
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1641
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1642 1643 1644
  }
}

1645
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
S
Shengliang Guan 已提交
1646 1647
  if (term > pSyncNode->raftStore.currentTerm) {
    raftStoreSetTerm(pSyncNode, term);
1648 1649 1650
  }
}

M
Minghao Li 已提交
1651
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
S
Shengliang Guan 已提交
1652
  if (pSyncNode->raftStore.currentTerm > newTerm) {
1653
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1654
            pSyncNode->raftStore.currentTerm);
M
Minghao Li 已提交
1655 1656
    return;
  }
M
Minghao Li 已提交
1657 1658

  do {
1659
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1660
            pSyncNode->raftStore.currentTerm);
M
Minghao Li 已提交
1661 1662
  } while (0);

S
Shengliang Guan 已提交
1663 1664
  if (pSyncNode->raftStore.currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode, newTerm);
M
Minghao Li 已提交
1665
    char tmpBuf[64];
1666
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1667
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
S
Shengliang Guan 已提交
1668
    raftStoreClearVote(pSyncNode);
M
Minghao Li 已提交
1669 1670 1671 1672 1673 1674 1675 1676

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1677 1678
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1679
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1680
  // maybe clear leader cache
M
Minghao Li 已提交
1681 1682 1683 1684
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1685 1686
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1687
  // state change
M
Minghao Li 已提交
1688 1689 1690
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1691 1692
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1693

1694 1695 1696
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1697 1698 1699 1700 1701
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1702 1703 1704
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1705 1706 1707
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1708
  // trace log
S
Shengliang Guan 已提交
1709
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1730
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1731 1732
  pSyncNode->leaderTime = taosGetTimestampMs();

1733
  pSyncNode->becomeLeaderNum++;
1734
  pSyncNode->hbrSlowNum = 0;
1735

1736 1737 1738
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1739
  // state change
M
Minghao Li 已提交
1740
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1741 1742

  // set leader cache
M
Minghao Li 已提交
1743 1744
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1745
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
1746 1747 1748
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
1749
    ASSERT(code == 0);
1750
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1751 1752
  }

S
Shengliang Guan 已提交
1753
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1754 1755
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1756 1757 1758
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1759 1760 1761
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1762
#if 0
1763 1764
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1765
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1766
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1767 1768
      if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
1769
      }
1770
    }
1771
    (pMySender->privateTerm) += 100;
1772
  }
M
Minghao Li 已提交
1773
#endif
1774

1775
  // close receiver
M
Minghao Li 已提交
1776 1777
  if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
      snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
S
Shengliang Guan 已提交
1778
    snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
1779 1780
  }

M
Minghao Li 已提交
1781
  // stop elect timer
M
Minghao Li 已提交
1782
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1783

M
Minghao Li 已提交
1784 1785
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1786

M
Minghao Li 已提交
1787 1788
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1789

1790 1791 1792 1793 1794
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1795 1796 1797
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

B
Benguang Zhao 已提交
1798 1799 1800
  // reset log buffer
  syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);

M
Minghao Li 已提交
1801
  // trace log
1802
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1803 1804 1805
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
1806
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1807 1808 1809 1810 1811
  bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
  if (!granted) {
    sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
    return;
  }
1812
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1813

S
Shengliang Guan 已提交
1814
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1815

B
Benguang Zhao 已提交
1816
  int32_t ret = syncNodeAppendNoop(pSyncNode);
1817 1818 1819 1820
  if (ret < 0) {
    sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr());
  }

B
Benguang Zhao 已提交
1821
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
1822
  ASSERT(lastIndex >= 0);
S
Shengliang Guan 已提交
1823
  sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "",
S
Shengliang Guan 已提交
1824
        pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1825 1826
}

M
Minghao Li 已提交
1827 1828
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1829
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1830
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1831 1832 1833 1834 1835
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1836 1837 1838
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
1839
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1840
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
B
Benguang Zhao 已提交
1841
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1842
  sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
S
Shengliang Guan 已提交
1843
        pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
M
Minghao Li 已提交
1844

S
Shengliang Guan 已提交
1845
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1846 1847 1848
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
1849
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1850
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
B
Benguang Zhao 已提交
1851
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1852
  sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
S
Shengliang Guan 已提交
1853
        pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1854

S
Shengliang Guan 已提交
1855
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1856 1857 1858
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
1859
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1860
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
B
Benguang Zhao 已提交
1861
  SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
S
Shengliang Guan 已提交
1862
  sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
S
Shengliang Guan 已提交
1863
        pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
B
Benguang Zhao 已提交
1864

S
Shengliang Guan 已提交
1865
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1866 1867
}

M
Minghao Li 已提交
1868 1869
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1870
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
1871
  ASSERT(term == pSyncNode->raftStore.currentTerm);
1872 1873
  bool voted = raftStoreHasVoted(pSyncNode);
  ASSERT(!voted);
M
Minghao Li 已提交
1874

S
Shengliang Guan 已提交
1875
  raftStoreVote(pSyncNode, pRaftId);
M
Minghao Li 已提交
1876 1877
}

M
Minghao Li 已提交
1878
// simulate get vote from outside
M
Minghao Li 已提交
1879
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1880
  syncNodeVoteForTerm(pSyncNode, pSyncNode->raftStore.currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1881

S
Shengliang Guan 已提交
1882 1883
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1884
  if (ret != 0) return;
M
Minghao Li 已提交
1885

S
Shengliang Guan 已提交
1886
  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1887 1888
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
S
Shengliang Guan 已提交
1889
  pMsg->term = pSyncNode->raftStore.currentTerm;
M
Minghao Li 已提交
1890 1891 1892 1893
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1894
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1895 1896
}

M
Minghao Li 已提交
1897
// return if has a snapshot
M
Minghao Li 已提交
1898 1899
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1900
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1901 1902
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1903 1904 1905 1906 1907 1908 1909
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1910 1911
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1912
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1913
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1914 1915
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1916 1917 1918 1919 1920 1921 1922
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1923 1924
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1925 1926
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1927 1928
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1929
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1930 1931
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1932 1933
    }

M
Minghao Li 已提交
1934 1935 1936
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1937 1938 1939 1940
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1941
  } else {
M
Minghao Li 已提交
1942 1943
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1944
  }
M
Minghao Li 已提交
1945

M
Minghao Li 已提交
1946 1947 1948 1949 1950 1951 1952
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1953 1954
  return 0;
}
M
Minghao Li 已提交
1955

M
Minghao Li 已提交
1956
// return append-entries first try index
M
Minghao Li 已提交
1957 1958 1959 1960 1961
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1962 1963
// if index > 0, return index - 1
// else, return -1
1964 1965 1966 1967 1968 1969 1970 1971 1972
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1973 1974 1975 1976
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1977 1978 1979 1980 1981 1982 1983 1984 1985
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

1986 1987 1988
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

1989
  SSyncRaftEntry* pPreEntry = NULL;
1990 1991 1992 1993 1994 1995 1996
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

1997
    pSyncNode->pLogStore->cacheHit++;
1998 1999 2000
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
2001
    pSyncNode->pLogStore->cacheMiss++;
2002 2003 2004 2005
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
2006 2007 2008 2009 2010 2011

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2012
  if (code == 0) {
2013
    ASSERT(pPreEntry != NULL);
2014
    preTerm = pPreEntry->term;
2015 2016 2017 2018

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
2019
      syncEntryDestroy(pPreEntry);
2020 2021
    }

2022 2023
    return preTerm;
  } else {
2024 2025 2026 2027
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2028 2029 2030 2031
      }
    }
  }

2032
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
2033
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2034 2035
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2036 2037 2038 2039

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2040
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2041 2042 2043
  return 0;
}

M
Minghao Li 已提交
2044
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2045
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2046

S
Shengliang Guan 已提交
2047 2048 2049
  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2050
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
2051 2052
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
2053
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
2054 2055
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
2056
    }
M
Minghao Li 已提交
2057

M
Minghao Li 已提交
2058
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
2059 2060
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
2061
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
2062 2063
      rpcFreeCont(rpcMsg.pCont);
      return;
2064
    }
M
Minghao Li 已提交
2065

S
Shengliang Guan 已提交
2066
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
2067
  }
M
Minghao Li 已提交
2068 2069
}

M
Minghao Li 已提交
2070
static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2071
  if (!syncIsInit()) return;
M
Minghao Li 已提交
2072

M
Minghao Li 已提交
2073 2074
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
2075

2076
  if (pNode == NULL) return;
M
Minghao Li 已提交
2077 2078 2079 2080 2081

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
2082

2083
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
2084 2085 2086 2087
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
M
Minghao Li 已提交
2088

S
Shengliang Guan 已提交
2089
  SRpcMsg rpcMsg = {0};
2090 2091
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
2092

S
Shengliang Guan 已提交
2093
  if (code != 0) {
M
Minghao Li 已提交
2094
    sError("failed to build elect msg");
M
Minghao Li 已提交
2095
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
2096
    return;
M
Minghao Li 已提交
2097 2098
  }

S
Shengliang Guan 已提交
2099
  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
2100
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
2101 2102 2103

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2104
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
2105
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
2106
    syncNodeRelease(pNode);
2107
    return;
M
Minghao Li 已提交
2108
  }
M
Minghao Li 已提交
2109 2110

  syncNodeRelease(pNode);
M
Minghao Li 已提交
2111 2112
}

M
Minghao Li 已提交
2113
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2114
  if (!syncIsInit()) return;
2115

S
Shengliang Guan 已提交
2116 2117 2118 2119
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2120
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2121 2122 2123
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2124
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2125
        return;
2126
      }
M
Minghao Li 已提交
2127

2128
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2129 2130
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2131
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2132 2133
        rpcFreeCont(rpcMsg.pCont);
        return;
2134
      }
S
Shengliang Guan 已提交
2135 2136 2137 2138

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2139
    } else {
S
Shengliang Guan 已提交
2140 2141
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2142
    }
M
Minghao Li 已提交
2143 2144 2145
  }
}

2146
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2147
  int64_t hbDataRid = (int64_t)param;
2148
  int64_t tsNow = taosGetTimestampMs();
2149

2150 2151
  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
M
Minghao Li 已提交
2152
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2153 2154
    return;
  }
2155

2156
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2157
  if (pSyncNode == NULL) {
2158
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2159
    sError("hb timer get pSyncNode NULL");
2160 2161 2162 2163 2164 2165 2166 2167
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2168
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2169 2170 2171
    return;
  }

M
Minghao Li 已提交
2172
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2173 2174
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2175
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2176 2177 2178
    return;
  }

M
Minghao Li 已提交
2179
  // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
2180 2181

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2182 2183 2184
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2185
    if (timerLogicClock == msgLogicClock) {
2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201
      if (tsNow > pData->execTime) {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
            "---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif

        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
S
Shengliang Guan 已提交
2202
        pSyncMsg->term = pSyncNode->raftStore.currentTerm;
2203 2204 2205
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
        pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
        pSyncMsg->privateTerm = 0;
2206
        pSyncMsg->timeStamp = tsNow;
2207 2208 2209 2210 2211 2212

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
2213 2214
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2215 2216 2217 2218 2219 2220 2221 2222
      } else {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
      }

M
Minghao Li 已提交
2223 2224
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2225 2226
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2227 2228 2229 2230
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2231
    } else {
M
Minghao Li 已提交
2232 2233
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2234 2235
    }
  }
2236 2237 2238

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2239 2240
}

2241 2242 2243 2244 2245
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
2246

2247
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
S
Shengliang Guan 已提交
2248
  SyncTerm        term = pNode->raftStore.currentTerm;
2249 2250
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
2251

S
Shengliang Guan 已提交
2252
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2253
  int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
2254
  syncEntryDestroy(pEntry);
M
Minghao Li 已提交
2255

2256 2257 2258
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2259
    sError("failed to propose noop msg while enqueue since %s", terrstr());
2260
  }
M
Minghao Li 已提交
2261

2262
  return code;
M
Minghao Li 已提交
2263 2264
}

2265 2266
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2267 2268 2269 2270
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2271 2272
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2273 2274 2275 2276 2277 2278 2279 2280 2281
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

B
Benguang Zhao 已提交
2282
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
2283 2284 2285 2286 2287 2288 2289
  if (pEntry->dataLen < sizeof(SMsgHead)) {
    sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
           TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
    syncEntryDestroy(pEntry);
    return -1;
  }

B
Benguang Zhao 已提交
2290 2291
  // append to log buffer
  if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
2292 2293
    sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
    terrno = TSDB_CODE_SYN_BUFFER_FULL;
S
Shengliang Guan 已提交
2294
    (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->raftStore.currentTerm, pEntry, TSDB_CODE_SYN_BUFFER_FULL);
2295
    syncEntryDestroy(pEntry);
B
Benguang Zhao 已提交
2296 2297 2298 2299
    return -1;
  }

  // proceed match index, with replicating on needed
2300
  SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
B
Benguang Zhao 已提交
2301

S
Shengliang Guan 已提交
2302
  sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
2303 2304 2305
         ", %" PRId64 ")",
         ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
         ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
B
Benguang Zhao 已提交
2306

B
Benguang Zhao 已提交
2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322
  // multi replica
  if (ths->replicaNum > 1) {
    return 0;
  }

  // single replica
  (void)syncNodeUpdateCommitIndex(ths, matchIndex);

  if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
    sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
    return -1;
  }

  return 0;
}

2323
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335
  if (pSyncNode->replicaNum == 1) {
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

2336
    if (tsNow - recvTime > tsHeartbeatTimeout) {
2337 2338 2339 2340 2341 2342 2343 2344 2345
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2365
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
B
Benguang Zhao 已提交
2366
  SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
S
Shengliang Guan 已提交
2367
  SyncTerm  term = ths->raftStore.currentTerm;
B
Benguang Zhao 已提交
2368 2369 2370 2371 2372 2373 2374

  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

B
Benguang Zhao 已提交
2375 2376
  int32_t ret = syncNodeAppend(ths, pEntry);
  return 0;
B
Benguang Zhao 已提交
2377 2378 2379
}

static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
M
Minghao Li 已提交
2380 2381
  int32_t ret = 0;

2382
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
S
Shengliang Guan 已提交
2383
  SyncTerm        term = ths->raftStore.currentTerm;
M
Minghao Li 已提交
2384
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
2385
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2386

2387 2388
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2389
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2390
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
2391
    if (code != 0) {
M
Minghao Li 已提交
2392
      sError("append noop error");
2393 2394
      return -1;
    }
2395 2396

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2397 2398
  }

2399 2400 2401
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
B
Benguang Zhao 已提交
2402
    syncEntryDestroy(pEntry);
2403 2404
  }

M
Minghao Li 已提交
2405 2406 2407
  return ret;
}

S
Shengliang Guan 已提交
2408 2409
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2410

M
Minghao Li 已提交
2411 2412 2413 2414
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2415
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2416
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2417
  syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
2418

2419 2420 2421 2422
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2423 2424
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
S
Shengliang Guan 已提交
2425
  pMsgReply->term = ths->raftStore.currentTerm;
2426
  pMsgReply->privateTerm = 8864;  // magic number
2427
  pMsgReply->startTime = ths->startTime;
2428
  pMsgReply->timeStamp = tsMs;
2429

S
Shengliang Guan 已提交
2430
  if (pMsg->term == ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2431 2432
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);

2433
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2434
    ths->minMatchIndex = pMsg->minMatchIndex;
2435 2436

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2437
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
S
Shengliang Guan 已提交
2438 2439 2440 2441
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2442
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
2443 2444 2445
      pSyncMsg->commitIndex = pMsg->commitIndex;
      pSyncMsg->currentTerm = pMsg->term;
      SyncIndex fcIndex = pSyncMsg->commitIndex;
2446 2447 2448 2449 2450 2451 2452

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2453
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2454 2455
        }
      }
2456 2457 2458
    }
  }

S
Shengliang Guan 已提交
2459
  if (pMsg->term >= ths->raftStore.currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2460
    // syncNodeStepDown(ths, pMsg->term);
S
Shengliang Guan 已提交
2461 2462 2463 2464
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2465
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
2466 2467
    pSyncMsg->currentTerm = pMsg->term;
    pSyncMsg->commitIndex = pMsg->commitIndex;
2468

S
Shengliang Guan 已提交
2469 2470
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2471 2472 2473 2474
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
S
Shengliang Guan 已提交
2475
        sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pSyncMsg->currentTerm);
2476
      }
2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491
    }
  }

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
  return 0;
}

2492
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
S
Shengliang Guan 已提交
2493 2494 2495 2496
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

2497
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
2498
  SSyncLogReplMgr*    pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
2499 2500 2501 2502
  if (pMgr == NULL) {
    sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
    return -1;
  }
2503 2504

  int64_t tsMs = taosGetTimestampMs();
S
Shengliang Guan 已提交
2505
  syncLogRecvHeartbeatReply(ths, pMsg, tsMs - pMsg->timeStamp, tbuf);
2506

2507 2508
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);

2509 2510 2511
  return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg);
}

2512
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
2513
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2514

M
Minghao Li 已提交
2515 2516 2517 2518
  const STraceId* trace = &pRpcMsg->info.traceId;
  char            tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

M
Minghao Li 已提交
2519
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2520
  int64_t timeDiff = tsMs - pMsg->timeStamp;
M
Minghao Li 已提交
2521
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff, tbuf);
M
Minghao Li 已提交
2522

2523
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2524
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2525 2526 2527
  return 0;
}

S
Shengliang Guan 已提交
2528 2529
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2530 2531
  syncLogRecvLocalCmd(ths, pMsg, "");

2532
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
2533
    syncNodeStepDown(ths, pMsg->currentTerm);
2534 2535

  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
2536 2537 2538 2539
    SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
    if (pMsg->currentTerm == matchTerm) {
      (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
    }
2540
    if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
S
Shengliang Guan 已提交
2541
      sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
2542 2543 2544 2545 2546 2547 2548 2549 2550
             ths->commitIndex);
    }
  } else {
    sError("error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
2551 2552 2553 2554 2555 2556 2557 2558 2559 2560
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2561

2562
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2563
  sNTrace(ths, "on client request");
2564

B
Benguang Zhao 已提交
2565 2566
  int32_t code = 0;

B
Benguang Zhao 已提交
2567
  SyncIndex       index = syncLogBufferGetEndIndex(ths->pLogBuf);
S
Shengliang Guan 已提交
2568
  SyncTerm        term = ths->raftStore.currentTerm;
B
Benguang Zhao 已提交
2569
  SSyncRaftEntry* pEntry = NULL;
2570 2571 2572 2573
  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
B
Benguang Zhao 已提交
2574 2575
  }

2576 2577 2578 2579 2580
  if (pEntry == NULL) {
    sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
    return -1;
  }

B
Benguang Zhao 已提交
2581 2582 2583 2584 2585
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    if (pRetIndex) {
      (*pRetIndex) = index;
    }

2586 2587
    int32_t code = syncNodeAppend(ths, pEntry);
    return code;
2588 2589 2590
  } else {
    syncEntryDestroy(pEntry);
    pEntry = NULL;
2591
    return -1;
B
Benguang Zhao 已提交
2592 2593 2594
  }
}

S
Shengliang Guan 已提交
2595 2596 2597
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2598
      return "follower";
S
Shengliang Guan 已提交
2599
    case TAOS_SYNC_STATE_CANDIDATE:
2600
      return "candidate";
S
Shengliang Guan 已提交
2601
    case TAOS_SYNC_STATE_LEADER:
2602
      return "leader";
S
Shengliang Guan 已提交
2603
    case TAOS_SYNC_STATE_ERROR:
2604
      return "error";
S
Shengliang Guan 已提交
2605 2606 2607 2608
    case TAOS_SYNC_STATE_OFFLINE:
      return "offline";
    default:
      return "unknown";
S
Shengliang Guan 已提交
2609
  }
M
Minghao Li 已提交
2610
}
2611

2612
#if 0
2613
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2614
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2615
    sNTrace(ths, "I am not follower, can not do leader transfer");
2616 2617
    return 0;
  }
2618 2619

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2620
    sNTrace(ths, "restore not finish, can not do leader transfer");
2621 2622 2623
    return 0;
  }

S
Shengliang Guan 已提交
2624
  if (pEntry->term < ths->raftStore.currentTerm) {
2625
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2626 2627 2628 2629
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2630
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2631 2632 2633
    return 0;
  }

2634 2635
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2636
      sNTrace(ths, "I am vnode, can not do leader transfer");
2637 2638 2639 2640
      return 0;
    }
  */

2641
  SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
S
Shengliang Guan 已提交
2642
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2643

M
Minghao Li 已提交
2644 2645 2646
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2647

M
Minghao Li 已提交
2648 2649
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2650 2651 2652
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
2653
    ASSERT(ret == 0);
M
Minghao Li 已提交
2654

2655
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2656
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2657 2658
  }

M
Minghao Li 已提交
2659
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2660
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2661
        .code = 0,
S
Shengliang Guan 已提交
2662
        .currentTerm = ths->raftStore.currentTerm,
S
Shengliang Guan 已提交
2663 2664 2665 2666 2667 2668 2669
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2670 2671
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2672 2673
  }

2674 2675 2676
  return 0;
}

2677 2678
#endif

2679
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2680
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2681 2682 2683 2684
    SRaftId raftId = {
        .addr = SYNC_ADDR(&pNewCfg->nodeInfo[i]),
        .vgId = ths->vgId,
    };
2685 2686 2687 2688 2689 2690 2691 2692 2693 2694

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2695 2696 2697 2698
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

2699
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2700
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2701 2702 2703 2704 2705
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2706 2707 2708 2709
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2710
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2711 2712 2713 2714 2715
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2716
}
M
Minghao Li 已提交
2717

2718 2719
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2720
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2721 2722 2723 2724 2725 2726 2727
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2728 2729
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2730
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2731 2732 2733 2734 2735 2736 2737 2738 2739
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2740
  if (pState == NULL) {
2741
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2742 2743
    return false;
  }
M
Minghao Li 已提交
2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2769
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2770
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2771
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2772 2773 2774 2775 2776 2777
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2778
}