syncMain.c 87.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
M
Minghao Li 已提交
26
#include "syncRaftCfg.h"
M
Minghao Li 已提交
27
#include "syncRaftLog.h"
M
Minghao Li 已提交
28
#include "syncRaftStore.h"
M
Minghao Li 已提交
29
#include "syncReplication.h"
M
Minghao Li 已提交
30 31
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
32
#include "syncRespMgr.h"
M
Minghao Li 已提交
33
#include "syncSnapshot.h"
M
Minghao Li 已提交
34
#include "syncTimeout.h"
M
Minghao Li 已提交
35
#include "syncUtil.h"
M
Minghao Li 已提交
36
#include "syncVoteMgr.h"
M
Minghao Li 已提交
37

M
Minghao Li 已提交
38 39 40 41 42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
43
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
44
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
45 46 47
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
S
Shengliang Guan 已提交
48 49 50 51 52 53 54 55 56 57 58 59
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool    syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void    syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool    syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);

static bool    syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);

static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
static SyncIndex     syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
M
Minghao Li 已提交
60

61
int64_t syncOpen(SSyncInfo* pSyncInfo) {
S
Shengliang Guan 已提交
62 63
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
64
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
65 66
    return -1;
  }
M
Minghao Li 已提交
67

S
Shengliang Guan 已提交
68 69 70
  pSyncNode->rid = syncNodeAdd(pSyncNode);
  if (pSyncNode->rid < 0) {
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
71 72 73
    return -1;
  }

S
Shengliang Guan 已提交
74 75 76 77 78 79
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
80
  return pSyncNode->rid;
M
Minghao Li 已提交
81
}
M
Minghao Li 已提交
82

M
Minghao Li 已提交
83
void syncStart(int64_t rid) {
S
Shengliang Guan 已提交
84 85 86 87
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodeStart(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
88 89 90
  }
}

M
Minghao Li 已提交
91
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
92
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
93
  if (pSyncNode != NULL) {
94
    pSyncNode->isStart = false;
S
Shengliang Guan 已提交
95
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
96
    syncNodeRemove(rid);
M
Minghao Li 已提交
97
  }
S
Shengliang Guan 已提交
98
}
M
Minghao Li 已提交
99

M
Minghao Li 已提交
100 101
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
102 103 104 105
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
M
Minghao Li 已提交
106 107
}

S
Shengliang Guan 已提交
108 109 110
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
111 112
}

S
Shengliang Guan 已提交
113
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
114
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
115
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
116

M
Minghao Li 已提交
117
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
118
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
119
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
120
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
121
    return -1;
M
Minghao Li 已提交
122
  }
123

S
Shengliang Guan 已提交
124 125
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
126

M
Minghao Li 已提交
127 128 129 130
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
131
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
132 133 134 135 136
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
    syncNodeReplicate(pSyncNode);
  }
S
Shengliang Guan 已提交
137

S
Shengliang Guan 已提交
138
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
139
  return 0;
M
Minghao Li 已提交
140
}
M
Minghao Li 已提交
141

S
Shengliang Guan 已提交
142 143 144 145
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
146
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
147 148
  if (pSyncNode == NULL) return code;

S
Shengliang Guan 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
  switch (pMsg->msgType) {
    case TDMT_SYNC_HEARTBEAT:
      code = syncNodeOnHeartbeat(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_HEARTBEAT_REPLY:
      code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_TIMEOUT:
      code = syncNodeOnTimeout(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_CLIENT_REQUEST:
      code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
      break;
    case TDMT_SYNC_REQUEST_VOTE:
      code = syncNodeOnRequestVote(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_REQUEST_VOTE_REPLY:
      code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES:
      code = syncNodeOnAppendEntries(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_APPEND_ENTRIES_REPLY:
      code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_SEND:
      code = syncNodeOnSnapshot(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_SNAPSHOT_RSP:
      code = syncNodeOnSnapshotReply(pSyncNode, pMsg);
      break;
    case TDMT_SYNC_LOCAL_CMD:
      code = syncNodeOnLocalCmd(pSyncNode, pMsg);
      break;
    default:
      sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg,
             TMSG_INFO(pMsg->msgType));
      code = -1;
M
Minghao Li 已提交
187 188
  }

S
Shengliang Guan 已提交
189
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
190
  return code;
191 192
}

S
Shengliang Guan 已提交
193
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
194
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
195
  if (pSyncNode == NULL) return -1;
196

S
Shengliang Guan 已提交
197
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
198
  syncNodeRelease(pSyncNode);
199 200 201
  return ret;
}

M
Minghao Li 已提交
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

218
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
219
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
220
  if (pSyncNode == NULL) {
221
    sError("sync begin snapshot error");
222 223
    return -1;
  }
224

225 226
  int32_t code = 0;

M
Minghao Li 已提交
227
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
228 229 230
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
231 232 233
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
234 235 236
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
237 238
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
239
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
240 241 242
      return 0;
    }

M
Minghao Li 已提交
243 244 245
    goto _DEL_WAL;

  } else {
246 247 248 249 250 251 252 253 254 255 256 257
    lastApplyIndex -= SYNC_VNODE_LOG_RETENTION;

    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
      syncNodeRelease(pSyncNode);
      return 0;
    }

M
Minghao Li 已提交
258 259 260 261 262 263 264 265 266 267 268 269 270 271
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
272 273 274 275
              sNTrace(pSyncNode,
                      "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                      " of %s:%d, do not delete wal",
                      lastApplyIndex, matchIndex, host, port);
M
Minghao Li 已提交
276 277
            } while (0);

S
Shengliang Guan 已提交
278
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
279 280 281 282 283 284
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
285 286 287
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
288
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
289 290 291 292
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
293
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
294
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
295 296 297
        return 0;

      } else {
S
Shengliang Guan 已提交
298
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
299
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
300 301 302 303 304 305 306 307 308
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
309 310 311
    }
  }

M
Minghao Li 已提交
312
_DEL_WAL:
313

M
Minghao Li 已提交
314
  do {
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    SyncIndex          snapshotVer = walGetSnapshotVer(pData->pWal);
    SyncIndex          walCommitVer = walGetCommittedVer(pData->pWal);
    SyncIndex          wallastVer = walGetLastVer(pData->pWal);
    if (lastApplyIndex <= walCommitVer) {
      SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

      if (snapshottingIndex == SYNC_INDEX_INVALID) {
        atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
        pSyncNode->snapshottingTime = taosGetTimestampMs();

        code = walBeginSnapshot(pData->pWal, lastApplyIndex);
        if (code == 0) {
          sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                  pSyncNode->snapshottingIndex, lastApplyIndex);
        } else {
          sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
                  terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
          atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
        }
335

M
Minghao Li 已提交
336
      } else {
337 338
        sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
                snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
339
      }
340
    }
M
Minghao Li 已提交
341
  } while (0);
342

S
Shengliang Guan 已提交
343
  syncNodeRelease(pSyncNode);
344 345 346 347
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
348
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
349
  if (pSyncNode == NULL) {
350
    sError("sync end snapshot error");
351 352 353
    return -1;
  }

354 355 356 357
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
358
    if (code != 0) {
359
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
360
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
361 362
      return -1;
    } else {
S
Shengliang Guan 已提交
363
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
364 365
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
366
  }
367

S
Shengliang Guan 已提交
368
  syncNodeRelease(pSyncNode);
369 370 371
  return code;
}

M
Minghao Li 已提交
372
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
373
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
374
  if (pSyncNode == NULL) {
375
    sError("sync step down error");
M
Minghao Li 已提交
376 377 378 379
    return -1;
  }

  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
380
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
381 382 383
  return 0;
}

384 385 386
bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
387
    sError("sync ready for read error");
388 389 390 391 392 393 394 395 396 397
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) {
    syncNodeRelease(pSyncNode);
    return true;
  }

  bool ready = false;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
398 399 400
    if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
      // apply queue not empty
      ready = false;
401

402 403
    } else {
      if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
404
        SyncIndex       lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
405
        SSyncRaftEntry* pEntry = NULL;
406 407 408 409 410 411 412
        SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
        int32_t         code = 0;
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
          code = 0;

413
          pSyncNode->pLogStore->cacheHit++;
414 415 416
          sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);

        } else {
417
          pSyncNode->pLogStore->cacheMiss++;
418 419 420 421 422
          sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);

          code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
        }

423 424 425 426 427
        if (code == 0 && pEntry != NULL) {
          if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
            ready = true;
          }

428 429 430 431 432
          if (h) {
            taosLRUCacheRelease(pCache, h, false);
          } else {
            syncEntryDestory(pEntry);
          }
433
        }
434 435 436 437
      }
    }
  }

438 439 440 441 442 443 444 445
  if (!ready) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
      terrno = TSDB_CODE_SYN_NOT_LEADER;
    } else {
      terrno = TSDB_CODE_APP_NOT_READY;
    }
  }

446 447 448 449
  syncNodeRelease(pSyncNode);
  return ready;
}

450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
bool syncSnapshotSending(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotSending(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

bool syncSnapshotRecving(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
    return false;
  }

  bool b = syncNodeSnapshotRecving(pSyncNode);
  syncNodeRelease(pSyncNode);
  return b;
}

M
Minghao Li 已提交
472 473
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
S
Shengliang Guan 已提交
474
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
475 476 477 478
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

479
  int32_t ret = 0;
480
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
481
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
482 483 484 485 486 487 488
    if (pSyncNode->peersNum == 2) {
      SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
      SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
      if (matchIndex1 > matchIndex0) {
        newLeader = (pSyncNode->peersNodeInfo)[1];
      }
    }
489 490 491
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
492 493 494 495 496
  return ret;
}

int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
497
    sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
M
Minghao Li 已提交
498 499 500 501
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

S
Shengliang Guan 已提交
502
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
503

504 505 506 507
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
508 509 510 511
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

S
Shengliang Guan 已提交
512 513 514
  int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false);
  rpcFreeCont(rpcMsg.pCont);
  return ret;
M
Minghao Li 已提交
515 516
}

517 518
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
519

S
Shengliang Guan 已提交
520
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
521 522 523 524
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
525 526
  }

527
  return state;
M
Minghao Li 已提交
528 529
}

530
#if 0
531 532 533 534 535
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
536
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
537 538 539 540 541 542 543 544 545 546 547
  if (pSyncNode == NULL) {
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
    }
S
Shengliang Guan 已提交
548
    syncNodeRelease(pSyncNode);
549 550 551 552 553 554 555 556 557 558
    return -1;
  }
  ASSERT(pEntry != NULL);

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

  syncEntryDestory(pEntry);
S
Shengliang Guan 已提交
559
  syncNodeRelease(pSyncNode);
560 561 562
  return 0;
}

563
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
564
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
565 566 567
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
568
  ASSERT(rid == pSyncNode->rid);
569 570
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
571
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
572

S
Shengliang Guan 已提交
573
  syncNodeRelease(pSyncNode);
574 575 576
  return 0;
}

577
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
578
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
579 580 581
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
582
  ASSERT(rid == pSyncNode->rid);
583 584 585 586

  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
587
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
588 589 590 591 592 593
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
594
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
595
         sMeta->lastConfigIndex);
596

S
Shengliang Guan 已提交
597
  syncNodeRelease(pSyncNode);
598 599
  return 0;
}
600
#endif
601

602 603 604 605
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
606
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
607 608 609 610 611
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
612
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
613
         snapshotLastApplyIndex, lastIndex);
614 615 616 617

  return lastIndex;
}

S
Shengliang Guan 已提交
618
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
M
Minghao Li 已提交
619
  pEpSet->numOfEps = 0;
620

S
Shengliang Guan 已提交
621
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
622
  if (pSyncNode == NULL) return;
623

S
Shengliang Guan 已提交
624
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
625 626 627 628 629
    SEp* pEp = &pEpSet->eps[i];
    tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    pEpSet->numOfEps++;
    sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
630
  }
M
Minghao Li 已提交
631 632 633
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
  }
M
Minghao Li 已提交
634

S
Shengliang Guan 已提交
635
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
636
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
637
}
638

M
Minghao Li 已提交
639
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
640
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
641
  if (pSyncNode == NULL) {
642
    sError("sync propose error");
M
Minghao Li 已提交
643
    return -1;
644
  }
M
Minghao Li 已提交
645

646
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
S
Shengliang Guan 已提交
647
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
648 649 650
  return ret;
}

651
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
652 653 654 655 656
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
    return -1;
  }
M
Minghao Li 已提交
657

S
Shengliang Guan 已提交
658 659 660 661 662 663 664 665
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

666
  // heartbeat timeout
667
  if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
668 669 670 671 672 673
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

S
Shengliang Guan 已提交
674 675 676
  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
677
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
678 679 680
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
681 682 683
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
S
Shengliang Guan 已提交
684 685
    } else {
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
686
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
687
             TMSG_INFO(pMsg->msgType));
688
      return -1;
689
    }
S
Shengliang Guan 已提交
690
  } else {
S
Shengliang Guan 已提交
691 692
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
693
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
694
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
695 696 697 698 699
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
    }
M
Minghao Li 已提交
700

701 702 703 704 705
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
706
    }
M
Minghao Li 已提交
707

708 709
    return code;
  }
M
Minghao Li 已提交
710 711
}

S
Shengliang Guan 已提交
712
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
713 714 715 716 717
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
M
Minghao Li 已提交
718
  pSyncTimer->timeStamp = taosGetTimestampMs();
719 720 721 722
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
723
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
724
  int32_t ret = 0;
S
Shengliang Guan 已提交
725
  int64_t tsNow = taosGetTimestampMs();
S
Shengliang Guan 已提交
726
  if (syncIsInit()) {
727 728 729 730 731 732
    SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
    if (pData == NULL) {
      pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
      pData->rid = syncHbTimerDataAdd(pData);
    }
    pSyncTimer->hbDataRid = pData->rid;
S
Shengliang Guan 已提交
733
    pSyncTimer->timeStamp = tsNow;
734 735

    pData->syncNodeRid = pSyncNode->rid;
736 737 738
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
S
Shengliang Guan 已提交
739
    pData->execTime = tsNow + pSyncTimer->timerMS;
M
Minghao Li 已提交
740

741 742
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
                 syncEnv()->pTimerManager, &pSyncTimer->pTimer);
743 744 745 746 747 748
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
749
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
750 751 752 753
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
754 755
  syncHbTimerDataRemove(pSyncTimer->hbDataRid);
  pSyncTimer->hbDataRid = -1;
756 757 758
  return ret;
}

S
Shengliang Guan 已提交
759 760
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
761 762 763 764
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
765

M
Minghao Li 已提交
766 767 768 769
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
770
      goto _error;
M
Minghao Li 已提交
771
    }
772
  }
M
Minghao Li 已提交
773

S
Shengliang Guan 已提交
774
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
775
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
776
    // create a new raft config file
S
Shengliang Guan 已提交
777
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
778
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
779
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
780
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
781
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
782 783
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
784
      goto _error;
785
    }
786
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
787
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
788 789
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
790 791 792
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
793
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
794
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
795
      goto _error;
796
    }
S
Shengliang Guan 已提交
797 798

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
799 800 801 802 803 804
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
805 806 807 808
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
809 810

    raftCfgClose(pSyncNode->pRaftCfg);
811
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
812 813
  }

S
Shengliang Guan 已提交
814 815
  // init by SSyncInfo
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
816 817 818 819 820 821 822
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
823
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
824 825 826
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
827

M
Minghao Li 已提交
828
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
829
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
830 831 832
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
833

M
Minghao Li 已提交
834 835
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
836
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
837
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
838 839
    goto _error;
  }
M
Minghao Li 已提交
840

M
Minghao Li 已提交
841
  // init internal
M
Minghao Li 已提交
842
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
843
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
844
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
845
    goto _error;
846
  }
M
Minghao Li 已提交
847

M
Minghao Li 已提交
848
  // init peersNum, peers, peersId
M
Minghao Li 已提交
849
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
850 851
  int32_t j = 0;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
852 853
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
854 855 856
      j++;
    }
  }
S
Shengliang Guan 已提交
857
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
858
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
859
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
860
      goto _error;
861
    }
M
Minghao Li 已提交
862
  }
M
Minghao Li 已提交
863

M
Minghao Li 已提交
864
  // init replicaNum, replicasId
M
Minghao Li 已提交
865
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
866
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
867
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
868
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
869
      goto _error;
870
    }
M
Minghao Li 已提交
871 872
  }

M
Minghao Li 已提交
873
  // init raft algorithm
M
Minghao Li 已提交
874
  pSyncNode->pFsm = pSyncInfo->pFsm;
875
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
876
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
877 878
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
879
  // init life cycle outside
M
Minghao Li 已提交
880

M
Minghao Li 已提交
881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
905
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
906
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
907
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
908
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
909
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
910 911
    goto _error;
  }
M
Minghao Li 已提交
912

M
Minghao Li 已提交
913
  // init TLA+ candidate vars
M
Minghao Li 已提交
914
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
915
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
916
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
917 918
    goto _error;
  }
M
Minghao Li 已提交
919
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
920
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
921
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
922 923
    goto _error;
  }
M
Minghao Li 已提交
924

M
Minghao Li 已提交
925 926
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
927
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
928
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
929 930
    goto _error;
  }
M
Minghao Li 已提交
931
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
932
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
933
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
934 935
    goto _error;
  }
M
Minghao Li 已提交
936 937 938

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
939
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
940
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
941 942
    goto _error;
  }
943 944 945 946 947

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
948
    if (code != 0) {
S
Shengliang Guan 已提交
949
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
950
      goto _error;
951
    }
952 953
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
954
      sNTrace(pSyncNode, "reset commit index by snapshot");
955 956 957
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
958

M
Minghao Li 已提交
959 960 961 962 963
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
964
  // init ping timer
M
Minghao Li 已提交
965
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
966
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
967 968
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
969
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
970
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
971

M
Minghao Li 已提交
972 973
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
974
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
975
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
976
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
977 978 979 980
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
981
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
982 983
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
984
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
985 986
  pSyncNode->heartbeatTimerCounter = 0;

987 988 989 990 991
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
992
  // tools
M
Minghao Li 已提交
993
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
994
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
995
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
996 997
    goto _error;
  }
M
Minghao Li 已提交
998

999 1000
  // restore state
  pSyncNode->restoreFinish = false;
1001

M
Minghao Li 已提交
1002
  // snapshot senders
S
Shengliang Guan 已提交
1003
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1004 1005 1006
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
S
Shengliang Guan 已提交
1007
    sSTrace(pSender, "snapshot sender create new while open, data:%p", pSender);
M
Minghao Li 已提交
1008 1009 1010
  }

  // snapshot receivers
1011
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
1012

M
Minghao Li 已提交
1013 1014 1015
  // is config changing
  pSyncNode->changing = false;

M
Minghao Li 已提交
1016 1017 1018
  // peer state
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1019 1020 1021
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1022
  // start in syncNodeStart
M
Minghao Li 已提交
1023
  // start raft
M
Minghao Li 已提交
1024
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1025

M
Minghao Li 已提交
1026 1027
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1028
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1029 1030
  pSyncNode->lastReplicateTime = timeNow;

1031 1032 1033
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

1034
  pSyncNode->isStart = true;
1035 1036 1037
  pSyncNode->electNum = 0;
  pSyncNode->becomeLeaderNum = 0;
  pSyncNode->configChangeNum = 0;
1038 1039
  pSyncNode->hbSlowNum = 0;
  pSyncNode->hbrSlowNum = 0;
M
Minghao Li 已提交
1040
  pSyncNode->tmrRoutineNum = 0;
1041

S
Shengliang Guan 已提交
1042
  sNTrace(pSyncNode, "sync open, node:%p", pSyncNode);
1043

M
Minghao Li 已提交
1044
  return pSyncNode;
1045 1046 1047

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1048 1049
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1050 1051 1052 1053
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1054 1055
}

M
Minghao Li 已提交
1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
    ASSERT(code == 0);
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

M
Minghao Li 已提交
1067 1068
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
1069
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
1070
    raftStoreNextTerm(pSyncNode->pRaftStore);
1071
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
1072

1073
    // Raft 3.6.2 Committing entries from previous terms
1074 1075
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
1076

M
Minghao Li 已提交
1077 1078
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
1079 1080
  }

1081 1082 1083
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
1084 1085
}

M
Minghao Li 已提交
1086 1087 1088 1089 1090 1091 1092 1093 1094
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
1095

1096 1097 1098
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
1099 1100
}

M
Minghao Li 已提交
1101 1102 1103 1104 1105 1106 1107 1108
void syncNodePreClose(SSyncNode* pSyncNode) {
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

1109 1110
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }

M
Minghao Li 已提交
1111
void syncNodeClose(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1112 1113
  if (pSyncNode == NULL) return;
  sNTrace(pSyncNode, "sync close, data:%p", pSyncNode);
M
Minghao Li 已提交
1114

S
Shengliang Guan 已提交
1115
  int32_t ret = raftStoreClose(pSyncNode->pRaftStore);
M
Minghao Li 已提交
1116
  ASSERT(ret == 0);
M
Minghao Li 已提交
1117
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
1118

M
Minghao Li 已提交
1119
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1120
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1121
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1122
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1123
  votesRespondDestory(pSyncNode->pVotesRespond);
1124
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1125
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1126
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1127
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1128
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1129
  logStoreDestory(pSyncNode->pLogStore);
1130
  pSyncNode->pLogStore = NULL;
M
Minghao Li 已提交
1131
  raftCfgClose(pSyncNode->pRaftCfg);
1132
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1133 1134 1135 1136 1137

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1138 1139 1140 1141
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

S
Shengliang Guan 已提交
1142
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1143
    if ((pSyncNode->senders)[i] != NULL) {
S
Shengliang Guan 已提交
1144
      sSTrace((pSyncNode->senders)[i], "snapshot sender destroy while close, data:%p", (pSyncNode->senders)[i]);
M
Minghao Li 已提交
1145 1146 1147 1148 1149
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1150 1151 1152 1153 1154
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1155
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1156 1157
}

M
Minghao Li 已提交
1158
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1159

M
Minghao Li 已提交
1160 1161 1162
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1163 1164
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1165 1166 1167
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1168
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1169
  }
M
Minghao Li 已提交
1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1183
  if (syncIsInit()) {
1184
    pSyncNode->electTimerMS = ms;
S
Shengliang Guan 已提交
1185

1186 1187 1188 1189 1190
    int64_t execTime = taosGetTimestampMs() + ms;
    atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
    atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
    pSyncNode->electTimerParam.pSyncNode = pSyncNode;
    pSyncNode->electTimerParam.pData = NULL;
S
Shengliang Guan 已提交
1191

M
Minghao Li 已提交
1192
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager,
1193
                 &pSyncNode->pElectTimer);
S
Shengliang Guan 已提交
1194

1195
  } else {
M
Minghao Li 已提交
1196
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1197
  }
M
Minghao Li 已提交
1198 1199 1200 1201 1202
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1203
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1204 1205
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1206

M
Minghao Li 已提交
1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1217 1218
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1219 1220 1221 1222 1223 1224 1225
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1226
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1227

S
Shengliang Guan 已提交
1228 1229
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1230 1231 1232
  return ret;
}

M
Minghao Li 已提交
1233
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1234
  int32_t ret = 0;
S
Shengliang Guan 已提交
1235 1236
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1237 1238 1239
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1240
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1241
  }
1242

S
Shengliang Guan 已提交
1243
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1244 1245 1246
  return ret;
}

M
Minghao Li 已提交
1247
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1248
  int32_t ret = 0;
M
Minghao Li 已提交
1249

1250
#if 0
M
Minghao Li 已提交
1251
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1252 1253
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1254

S
Shengliang Guan 已提交
1255
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1256
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1257 1258 1259
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1260
  }
1261

M
Minghao Li 已提交
1262 1263 1264
  return ret;
}

M
Minghao Li 已提交
1265 1266
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1267 1268

#if 0
M
Minghao Li 已提交
1269 1270 1271
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1272
#endif
1273

S
Shengliang Guan 已提交
1274
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1275
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1276 1277 1278
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1279
  }
1280

M
Minghao Li 已提交
1281 1282 1283
  return ret;
}

1284 1285 1286 1287 1288 1289
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1290 1291 1292
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1293
  syncUtilRaftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1294
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1295 1296 1297
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1298
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1299
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1300
  } else {
M
Minghao Li 已提交
1301
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
S
Shengliang Guan 已提交
1302
    rpcFreeCont(pMsg->pCont);
M
Minghao Li 已提交
1303
    return -1;
M
Minghao Li 已提交
1304
  }
M
Minghao Li 已提交
1305

M
Minghao Li 已提交
1306 1307 1308 1309 1310
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1311
  syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1312
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1313 1314 1315
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1316
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1317
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1318
  } else {
M
Minghao Li 已提交
1319
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1320
  }
M
Minghao Li 已提交
1321 1322 1323
  return 0;
}

1324
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1325 1326 1327
  bool b1 = false;
  bool b2 = false;

S
Shengliang Guan 已提交
1328
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1329 1330 1331 1332 1333 1334 1335
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

S
Shengliang Guan 已提交
1336
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

  ASSERT(b1 == b2);
  return b1;
}

1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1364
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1365
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1366 1367 1368 1369
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1370

1371
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1372 1373
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

1374 1375
  pSyncNode->configChangeNum++;

M
Minghao Li 已提交
1376 1377
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1378

M
Minghao Li 已提交
1379 1380
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1381

M
Minghao Li 已提交
1382 1383 1384 1385
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1386
  }
1387

M
Minghao Li 已提交
1388 1389 1390 1391 1392
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1393

M
Minghao Li 已提交
1394
  // log begin config change
S
Shengliang Guan 已提交
1395 1396 1397 1398
  char oldCfgStr[1024] = {0};
  char newCfgStr[1024] = {0};
  syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
  syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
1399
  sNInfo(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
M
Minghao Li 已提交
1400

M
Minghao Li 已提交
1401 1402
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1403
  }
M
Minghao Li 已提交
1404 1405
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1406 1407
  }

M
Minghao Li 已提交
1408
  // add last config index
M
Minghao Li 已提交
1409
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1410

M
Minghao Li 已提交
1411 1412 1413 1414 1415 1416 1417 1418 1419
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1420
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1421
      oldSenders[i] = (pSyncNode->senders)[i];
S
Shengliang Guan 已提交
1422
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1423
    }
1424

M
Minghao Li 已提交
1425 1426
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1427
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1428 1429 1430

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1431 1432
    int32_t j = 0;
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
1433 1434 1435 1436 1437
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
S
Shengliang Guan 已提交
1438
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1439
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1440
    }
1441

M
Minghao Li 已提交
1442 1443
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
1444
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
1445
      syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1446
    }
1447

1448 1449 1450
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1451 1452 1453 1454
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1455

M
Minghao Li 已提交
1456
    // reset snapshot senders
1457

M
Minghao Li 已提交
1458
    // clear new
S
Shengliang Guan 已提交
1459
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1460 1461
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
1462

M
Minghao Li 已提交
1463
    // reset new
S
Shengliang Guan 已提交
1464
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1465 1466
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1467
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1468
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
M
Minghao Li 已提交
1469 1470 1471
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1472
          sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
S
Shengliang Guan 已提交
1473
                  (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
M
Minghao Li 已提交
1474 1475 1476 1477 1478 1479 1480 1481 1482

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

S
Shengliang Guan 已提交
1483 1484
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
                  i, host, port, (pSyncNode->senders)[i], reset);
M
Minghao Li 已提交
1485 1486

          break;
M
Minghao Li 已提交
1487
        }
1488 1489
      }
    }
1490

M
Minghao Li 已提交
1491
    // create new
S
Shengliang Guan 已提交
1492
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1493 1494
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
S
Shengliang Guan 已提交
1495 1496 1497
        sSTrace((pSyncNode->senders)[i], "snapshot sender create new while reconfig, data:%p", (pSyncNode->senders)[i]);
      } else {
        sSTrace((pSyncNode->senders)[i], "snapshot sender already exist, data:%p", (pSyncNode->senders)[i]);
M
Minghao Li 已提交
1498
      }
1499 1500
    }

M
Minghao Li 已提交
1501
    // free old
S
Shengliang Guan 已提交
1502
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1503
      if (oldSenders[i] != NULL) {
S
Shengliang Guan 已提交
1504
        sNTrace(pSyncNode, "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1505 1506 1507
        snapshotSenderDestroy(oldSenders[i]);
        oldSenders[i] = NULL;
      }
1508 1509
    }

1510
    // persist cfg
M
Minghao Li 已提交
1511
    raftCfgPersist(pSyncNode->pRaftCfg);
1512

S
Shengliang Guan 已提交
1513
    char tmpbuf[1024] = {0};
1514
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
S
Shengliang Guan 已提交
1515
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1516

M
Minghao Li 已提交
1517 1518 1519
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
1520 1521 1522 1523 1524

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
      syncMaybeAdvanceCommitIndex(pSyncNode);

M
Minghao Li 已提交
1525 1526 1527 1528
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
1529
    // persist cfg
M
Minghao Li 已提交
1530
    raftCfgPersist(pSyncNode->pRaftCfg);
1531 1532
    sNInfo(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s", oldConfig.replicaNum,
           pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
1533
  }
1534

M
Minghao Li 已提交
1535
_END:
M
Minghao Li 已提交
1536
  // log end config change
1537
  sNInfo(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1538 1539
}

M
Minghao Li 已提交
1540 1541 1542 1543
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1544
    char tmpBuf[64];
1545
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1546
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1547 1548 1549 1550
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1551 1552 1553 1554 1555 1556
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1557
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1558
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1559
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1560
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1561 1562
    return;
  }
M
Minghao Li 已提交
1563 1564

  do {
1565
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1566
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1567 1568 1569 1570 1571
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1572
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1573 1574 1575 1576 1577 1578 1579 1580 1581 1582
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1583 1584
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1585
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1586
  // maybe clear leader cache
M
Minghao Li 已提交
1587 1588 1589 1590
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

1591 1592
  pSyncNode->hbSlowNum = 0;

M
Minghao Li 已提交
1593
  // state change
M
Minghao Li 已提交
1594 1595 1596
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1597 1598
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1599

1600 1601 1602
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1603 1604 1605 1606 1607
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1608 1609 1610
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1611
  // trace log
S
Shengliang Guan 已提交
1612
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1633
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1634 1635
  pSyncNode->leaderTime = taosGetTimestampMs();

1636
  pSyncNode->becomeLeaderNum++;
1637
  pSyncNode->hbrSlowNum = 0;
1638

1639 1640 1641
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1642
  // state change
M
Minghao Li 已提交
1643
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1644 1645

  // set leader cache
M
Minghao Li 已提交
1646 1647
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1648
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1649 1650
    // maybe overwrite myself, no harm
    // just do it!
1651 1652 1653 1654 1655 1656 1657 1658 1659

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1660 1661
  }

S
Shengliang Guan 已提交
1662
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1663 1664
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1665 1666 1667
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1668 1669 1670
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1671
#if 0
1672 1673
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1674
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1675
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1676 1677 1678
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1679
    }
1680
    (pMySender->privateTerm) += 100;
1681
  }
M
Minghao Li 已提交
1682
#endif
1683

1684 1685 1686 1687 1688
  // close receiver
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1689
  // stop elect timer
M
Minghao Li 已提交
1690
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1691

M
Minghao Li 已提交
1692 1693
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1694

M
Minghao Li 已提交
1695 1696
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1697

1698 1699 1700 1701 1702
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1703 1704 1705
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1706
  // trace log
1707
  sNInfo(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1708 1709 1710
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1711 1712
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1713
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1714

S
Shengliang Guan 已提交
1715
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1716

M
Minghao Li 已提交
1717
  // Raft 3.6.2 Committing entries from previous terms
1718 1719
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1720 1721

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1722
    syncNodeReplicate(pSyncNode);
1723
  }
M
Minghao Li 已提交
1724 1725
}

M
Minghao Li 已提交
1726 1727
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1728
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1729
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1730 1731 1732 1733 1734
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1735 1736 1737
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1738
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1739
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
S
Shengliang Guan 已提交
1740
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1741 1742 1743
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1744
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1745
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
S
Shengliang Guan 已提交
1746
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1747 1748 1749
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1750
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1751
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
S
Shengliang Guan 已提交
1752
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1753 1754
}

M
Minghao Li 已提交
1755 1756
// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1757
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
M
Minghao Li 已提交
1758 1759
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1760 1761 1762 1763

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1764
// simulate get vote from outside
M
Minghao Li 已提交
1765
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1766
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1767

S
Shengliang Guan 已提交
1768 1769
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1770
  if (ret != 0) return;
S
Shengliang Guan 已提交
1771 1772

  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1773 1774 1775 1776 1777 1778 1779
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1780
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1781 1782
}

M
Minghao Li 已提交
1783
// return if has a snapshot
M
Minghao Li 已提交
1784 1785
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1786
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1787 1788
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1789 1790 1791 1792 1793 1794 1795
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1796 1797
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1798
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1799
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1800 1801
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1802 1803 1804 1805 1806 1807 1808
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1809 1810
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1811 1812
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1813 1814
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1815
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1816 1817
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1818 1819
    }

M
Minghao Li 已提交
1820 1821 1822
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1823 1824 1825 1826
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1827
  } else {
M
Minghao Li 已提交
1828 1829
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1830
  }
M
Minghao Li 已提交
1831

M
Minghao Li 已提交
1832 1833 1834 1835 1836 1837 1838
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1839 1840
  return 0;
}
M
Minghao Li 已提交
1841

M
Minghao Li 已提交
1842
// return append-entries first try index
M
Minghao Li 已提交
1843 1844 1845 1846 1847
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1848 1849
// if index > 0, return index - 1
// else, return -1
1850 1851 1852 1853 1854 1855 1856 1857 1858
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1859 1860 1861 1862
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1863 1864 1865 1866 1867 1868 1869 1870 1871
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

1872 1873 1874
  SyncTerm  preTerm = 0;
  SyncIndex preIndex = index - 1;

1875
  SSyncRaftEntry* pPreEntry = NULL;
1876 1877 1878 1879 1880 1881 1882
  SLRUCache*      pCache = pSyncNode->pLogStore->pCache;
  LRUHandle*      h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
  int32_t         code = 0;
  if (h) {
    pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
    code = 0;

1883
    pSyncNode->pLogStore->cacheHit++;
1884 1885 1886
    sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);

  } else {
1887
    pSyncNode->pLogStore->cacheMiss++;
1888 1889 1890 1891
    sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);

    code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
  }
M
Minghao Li 已提交
1892 1893 1894 1895 1896 1897

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

1898 1899 1900
  if (code == 0) {
    ASSERT(pPreEntry != NULL);
    preTerm = pPreEntry->term;
1901 1902 1903 1904 1905 1906 1907

    if (h) {
      taosLRUCacheRelease(pCache, h, false);
    } else {
      syncEntryDestory(pPreEntry);
    }

1908 1909
    return preTerm;
  } else {
1910 1911 1912 1913
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
1914 1915 1916 1917
      }
    }
  }

1918
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
1919
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1920 1921
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
1922 1923 1924 1925

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1926
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1927 1928 1929
  return 0;
}

M
Minghao Li 已提交
1930
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1931 1932 1933 1934 1935
  if (!syncIsInit()) return;

  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1936
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
1937 1938
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
M
Minghao Li 已提交
1939
      sError("failed to build ping msg");
S
Shengliang Guan 已提交
1940 1941
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
1942
    }
M
Minghao Li 已提交
1943

M
Minghao Li 已提交
1944
    // sTrace("enqueue ping msg");
S
Shengliang Guan 已提交
1945 1946
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
M
Minghao Li 已提交
1947
      sError("failed to sync enqueue ping msg since %s", terrstr());
S
Shengliang Guan 已提交
1948 1949
      rpcFreeCont(rpcMsg.pCont);
      return;
1950 1951
    }

S
Shengliang Guan 已提交
1952
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
M
Minghao Li 已提交
1953 1954 1955 1956
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1957
  if (!syncIsInit()) return;
S
Shengliang Guan 已提交
1958

M
Minghao Li 已提交
1959 1960
  int64_t    rid = (int64_t)param;
  SSyncNode* pNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
1961

1962
  if (pNode == NULL) return;
M
Minghao Li 已提交
1963 1964 1965 1966 1967

  if (pNode->syncEqMsg == NULL) {
    syncNodeRelease(pNode);
    return;
  }
1968

1969
  int64_t tsNow = taosGetTimestampMs();
M
Minghao Li 已提交
1970 1971 1972 1973
  if (tsNow < pNode->electTimerParam.executeTime) {
    syncNodeRelease(pNode);
    return;
  }
1974

S
Shengliang Guan 已提交
1975
  SRpcMsg rpcMsg = {0};
1976 1977
  int32_t code =
      syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
1978

S
Shengliang Guan 已提交
1979
  if (code != 0) {
M
Minghao Li 已提交
1980
    sError("failed to build elect msg");
M
Minghao Li 已提交
1981
    syncNodeRelease(pNode);
S
Shengliang Guan 已提交
1982 1983 1984 1985
    return;
  }

  SyncTimeout* pTimeout = rpcMsg.pCont;
S
Shengliang Guan 已提交
1986
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
S
Shengliang Guan 已提交
1987 1988 1989

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
1990
    sError("failed to sync enqueue elect msg since %s", terrstr());
S
Shengliang Guan 已提交
1991
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1992
    syncNodeRelease(pNode);
1993
    return;
S
Shengliang Guan 已提交
1994
  }
M
Minghao Li 已提交
1995 1996

  syncNodeRelease(pNode);
M
Minghao Li 已提交
1997 1998
}

M
Minghao Li 已提交
1999
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
2000
  if (!syncIsInit()) return;
2001

S
Shengliang Guan 已提交
2002 2003 2004 2005
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2006
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
2007 2008 2009
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
M
Minghao Li 已提交
2010
        sError("failed to build heartbeat msg");
S
Shengliang Guan 已提交
2011
        return;
2012
      }
M
Minghao Li 已提交
2013

2014
      sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
S
Shengliang Guan 已提交
2015 2016
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
M
Minghao Li 已提交
2017
        sError("failed to enqueue heartbeat msg since %s", terrstr());
S
Shengliang Guan 已提交
2018 2019
        rpcFreeCont(rpcMsg.pCont);
        return;
2020
      }
S
Shengliang Guan 已提交
2021 2022 2023 2024

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

2025
    } else {
S
Shengliang Guan 已提交
2026 2027
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
2028
    }
M
Minghao Li 已提交
2029 2030 2031
  }
}

2032
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
2033
  int64_t hbDataRid = (int64_t)param;
2034
  int64_t tsNow = taosGetTimestampMs();
2035 2036 2037

  SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
  if (pData == NULL) {
M
Minghao Li 已提交
2038
    sError("hb timer get pData NULL, %" PRId64, hbDataRid);
2039 2040
    return;
  }
2041

2042
  SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
M
Minghao Li 已提交
2043
  if (pSyncNode == NULL) {
2044
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2045
    sError("hb timer get pSyncNode NULL");
2046 2047 2048 2049 2050 2051 2052 2053
    return;
  }

  SSyncTimer* pSyncTimer = pData->pTimer;

  if (!pSyncNode->isStart) {
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2054
    sError("vgId:%d, hb timer sync node already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2055 2056 2057
    return;
  }

M
Minghao Li 已提交
2058
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
2059 2060
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2061
    sError("vgId:%d, hb timer sync node not leader", pSyncNode->vgId);
M
Minghao Li 已提交
2062 2063 2064
    return;
  }

M
Minghao Li 已提交
2065
  if (pSyncNode->pRaftStore == NULL) {
2066 2067
    syncNodeRelease(pSyncNode);
    syncHbTimerDataRelease(pData);
M
Minghao Li 已提交
2068
    sError("vgId:%d, hb timer raft store already stop", pSyncNode->vgId);
M
Minghao Li 已提交
2069 2070 2071
    return;
  }

M
Minghao Li 已提交
2072
  // sTrace("vgId:%d, eq peer hb timer", pSyncNode->vgId);
2073 2074

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2075 2076 2077
    int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
    int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

2078
    if (timerLogicClock == msgLogicClock) {
2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098
      if (tsNow > pData->execTime) {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
            "---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif

        pData->execTime += pSyncTimer->timerMS;

        SRpcMsg rpcMsg = {0};
        (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

        SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
        pSyncMsg->srcId = pSyncNode->myRaftId;
        pSyncMsg->destId = pData->destId;
        pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
        pSyncMsg->commitIndex = pSyncNode->commitIndex;
        pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
        pSyncMsg->privateTerm = 0;
2099
        pSyncMsg->timeStamp = tsNow;
2100 2101 2102 2103 2104 2105

        // update reset time
        int64_t timerElapsed = tsNow - pSyncTimer->timeStamp;
        pSyncTimer->timeStamp = tsNow;

        // send msg
2106 2107
        syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
        syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
2108 2109 2110 2111 2112 2113 2114 2115
      } else {
#if 0        
        sTrace(
            "vgId:%d, hbDataRid:%ld,  pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
            pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
      }

M
Minghao Li 已提交
2116 2117
      if (syncIsInit()) {
        // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
2118 2119
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid,
                     syncEnv()->pTimerManager, &pSyncTimer->pTimer);
M
Minghao Li 已提交
2120 2121 2122 2123
      } else {
        sError("sync env is stop, reset peer hb timer error");
      }

2124
    } else {
M
Minghao Li 已提交
2125 2126
      sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId,
             timerLogicClock, msgLogicClock);
2127 2128
    }
  }
2129 2130 2131

  syncHbTimerDataRelease(pData);
  syncNodeRelease(pSyncNode);
2132 2133
}

2134 2135 2136 2137 2138
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
2139

2140 2141 2142 2143
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
2144

S
Shengliang Guan 已提交
2145
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
2146
  int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
M
Minghao Li 已提交
2147
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
2148

2149 2150 2151
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
M
Minghao Li 已提交
2152
    sError("failed to propose noop msg while enqueue since %s", terrstr());
2153 2154 2155
  }

  return code;
M
Minghao Li 已提交
2156 2157
}

2158 2159
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

2160 2161 2162 2163
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  SSyncLogStoreData* pData = pLogStore->data;
  sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);

S
Shengliang Guan 已提交
2164 2165
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
2166 2167 2168 2169 2170 2171 2172 2173 2174
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

2175
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197
  if (pSyncNode->replicaNum == 1) {
    return false;
  }

  int32_t toCount = 0;
  int64_t tsNow = taosGetTimestampMs();
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
    int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (recvTime == 0 || recvTime == -1) {
      continue;
    }

    if (tsNow - recvTime > SYNC_HEART_TIMEOUT_MS) {
      toCount++;
    }
  }

  bool b = (toCount >= pSyncNode->quorum ? true : false);

  return b;
}

2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  bool b = false;
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
      b = true;
      break;
    }
  }
  return b;
}

bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
  if (pSyncNode == NULL) return false;
  if (pSyncNode->pNewNodeReceiver == NULL) return false;
  if (pSyncNode->pNewNodeReceiver->start) return true;
  return false;
}

M
Minghao Li 已提交
2217 2218 2219
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

2220
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2221
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2222
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2223
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2224

2225 2226
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2227
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2228
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2229
    if (code != 0) {
M
Minghao Li 已提交
2230
      sError("append noop error");
2231 2232
      return -1;
    }
2233 2234

    syncCacheEntry(ths->pLogStore, pEntry, &h);
M
Minghao Li 已提交
2235 2236
  }

2237 2238 2239 2240 2241 2242
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2243 2244 2245
  return ret;
}

S
Shengliang Guan 已提交
2246 2247
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
2248 2249

  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2250
  int64_t timeDiff = tsMs - pMsg->timeStamp;
2251
  syncLogRecvHeartbeat(ths, pMsg, timeDiff);
2252

2253 2254 2255 2256
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2257 2258 2259 2260
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number
2261
  pMsgReply->timeStamp = tsMs;
2262

M
Minghao Li 已提交
2263
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2264 2265
    syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);

2266
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2267
    ths->minMatchIndex = pMsg->minMatchIndex;
2268 2269

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2270
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
S
Shengliang Guan 已提交
2271 2272 2273 2274
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2275 2276
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;
2277
      SyncIndex fcIndex = pSyncMsg->fcIndex;
2278 2279 2280 2281 2282 2283 2284

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2285
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, fcIndex);
2286 2287
        }
      }
2288 2289 2290
    }
  }

M
Minghao Li 已提交
2291
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2292
    // syncNodeStepDown(ths, pMsg->term);
S
Shengliang Guan 已提交
2293 2294 2295 2296
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2297 2298 2299
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

S
Shengliang Guan 已提交
2300 2301
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2302 2303 2304 2305
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2306
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2307 2308
      }
    }
M
Minghao Li 已提交
2309 2310
  }

2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322
  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
  return 0;
}

2323 2324
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2325

M
Minghao Li 已提交
2326
  int64_t tsMs = taosGetTimestampMs();
M
Minghao Li 已提交
2327
  int64_t timeDiff = tsMs - pMsg->timeStamp;
S
Shengliang Guan 已提交
2328
  syncLogRecvHeartbeatReply(ths, pMsg, timeDiff);
M
Minghao Li 已提交
2329

2330
  // update last reply time, make decision whether the other node is alive or not
M
Minghao Li 已提交
2331
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
2332 2333 2334
  return 0;
}

S
Shengliang Guan 已提交
2335 2336
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2337 2338
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2339 2340 2341
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2342 2343 2344
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2345
  } else {
M
Minghao Li 已提交
2346
    sError("error local cmd");
M
Minghao Li 已提交
2347 2348 2349 2350 2351
  }

  return 0;
}

M
Minghao Li 已提交
2352 2353 2354 2355 2356 2357 2358 2359 2360 2361
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2362

2363
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2364
  sNTrace(ths, "on client request");
2365

M
Minghao Li 已提交
2366
  int32_t ret = 0;
2367
  int32_t code = 0;
M
Minghao Li 已提交
2368

M
Minghao Li 已提交
2369
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2370
  SyncTerm        term = ths->pRaftStore->currentTerm;
2371 2372 2373 2374 2375 2376 2377
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2378

2379 2380
  LRUHandle* h = NULL;

M
Minghao Li 已提交
2381
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2382 2383 2384
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2385 2386 2387 2388 2389 2390
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2391

2392 2393 2394 2395
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2407
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2408 2409 2410 2411 2412 2413 2414

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }

2415 2416
        return -1;
      }
2417
    }
M
Minghao Li 已提交
2418

2419 2420
    syncCacheEntry(ths->pLogStore, pEntry, &h);

2421 2422
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2423
      syncNodeReplicate(ths);
2424
    }
2425

2426 2427
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2428 2429 2430 2431 2432
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2433
    }
M
Minghao Li 已提交
2434 2435
  }

2436 2437 2438 2439 2440 2441 2442 2443
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2444 2445 2446 2447 2448 2449
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2450
  return ret;
2451
}
M
Minghao Li 已提交
2452

S
Shengliang Guan 已提交
2453 2454 2455
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2456
      return "follower";
S
Shengliang Guan 已提交
2457
    case TAOS_SYNC_STATE_CANDIDATE:
2458
      return "candidate";
S
Shengliang Guan 已提交
2459
    case TAOS_SYNC_STATE_LEADER:
2460
      return "leader";
S
Shengliang Guan 已提交
2461
    default:
2462
      return "error";
S
Shengliang Guan 已提交
2463
  }
M
Minghao Li 已提交
2464
}
2465

2466
#if 0
2467
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2468
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2469
    sNTrace(ths, "I am not follower, can not do leader transfer");
2470 2471
    return 0;
  }
2472 2473

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2474
    sNTrace(ths, "restore not finish, can not do leader transfer");
2475 2476 2477
    return 0;
  }

2478
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2479
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2480 2481 2482 2483
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2484
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2485 2486 2487
    return 0;
  }

2488 2489
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2490
      sNTrace(ths, "I am vnode, can not do leader transfer");
2491 2492 2493 2494
      return 0;
    }
  */

2495
  SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
S
Shengliang Guan 已提交
2496
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2497

M
Minghao Li 已提交
2498 2499 2500
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2501

M
Minghao Li 已提交
2502 2503
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2504 2505 2506 2507
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
M
Minghao Li 已提交
2508

2509
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2510
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2511 2512
  }

M
Minghao Li 已提交
2513
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2514
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2515 2516 2517 2518 2519 2520 2521 2522 2523
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2524 2525
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2526 2527
  }

2528 2529 2530
  return 0;
}

2531 2532
#endif

2533
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2534
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2548 2549 2550 2551
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2552
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2553 2554 2555 2556
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2557 2558 2559 2560 2561 2562 2563 2564 2565
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2566
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2567

M
Minghao Li 已提交
2568 2569 2570
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2571 2572
  }

2573 2574
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2575

S
Shengliang Guan 已提交
2576
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2577 2578 2579 2580 2581 2582

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2583 2584 2585 2586
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
2587

2588
          ths->pLogStore->cacheHit++;
2589 2590
          sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry);

2591
        } else {
2592
          ths->pLogStore->cacheMiss++;
2593 2594
          sNTrace(ths, "miss cache index:%" PRId64, i);

2595
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
M
Minghao Li 已提交
2596 2597 2598
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2599
            sNError(ths, "get log entry error");
2600
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2601 2602
            continue;
          }
2603
        }
2604

2605
        SRpcMsg rpcMsg = {0};
2606 2607
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2608
        sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType));
M
Minghao Li 已提交
2609

2610
        // user commit
2611 2612
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2613
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2614 2615 2616
            internalExecute = false;
          }

M
Minghao Li 已提交
2617 2618
          sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute,
                  TMSG_INFO(pEntry->msgType));
2619

2620 2621
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2634
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2635
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
2636
          }
2637 2638
        }

2639 2640
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2641
        // leader transfer
2642 2643 2644
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
2645
        }
2646
#endif
2647 2648

        // restore finish
2649
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
2650 2651 2652 2653 2654 2655
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
2656

2657
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
2658
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
2659 2660 2661 2662
          }
        }

        rpcFreeCont(rpcMsg.pCont);
2663 2664 2665 2666 2667
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2668 2669 2670 2671
      }
    }
  }
  return 0;
2672 2673 2674
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2675
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2676 2677 2678 2679 2680
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2681 2682 2683 2684
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2685
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2686 2687 2688 2689 2690
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2691
}
M
Minghao Li 已提交
2692

2693 2694
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2695
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2696 2697 2698 2699 2700 2701 2702
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2703 2704
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2705
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2706 2707 2708 2709 2710 2711 2712 2713 2714
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2715
  if (pState == NULL) {
2716
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2717 2718
    return false;
  }
M
Minghao Li 已提交
2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2744
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2745
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2746
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2747 2748 2749 2750 2751 2752
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2753
}