syncMain.c 120.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
M
Minghao Li 已提交
26
#include "syncRaftCfg.h"
M
Minghao Li 已提交
27
#include "syncRaftLog.h"
M
Minghao Li 已提交
28
#include "syncRaftStore.h"
M
Minghao Li 已提交
29
#include "syncReplication.h"
M
Minghao Li 已提交
30 31
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
32
#include "syncRespMgr.h"
M
Minghao Li 已提交
33
#include "syncSnapshot.h"
M
Minghao Li 已提交
34
#include "syncTimeout.h"
M
Minghao Li 已提交
35
#include "syncUtil.h"
M
Minghao Li 已提交
36
#include "syncVoteMgr.h"
M
Minghao Li 已提交
37

M
Minghao Li 已提交
38 39 40 41 42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
43
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
44
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
M
Minghao Li 已提交
45

46
int64_t syncOpen(SSyncInfo* pSyncInfo) {
S
Shengliang Guan 已提交
47 48
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
49
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
50 51
    return -1;
  }
M
Minghao Li 已提交
52

S
Shengliang Guan 已提交
53 54 55
  pSyncNode->rid = syncNodeAdd(pSyncNode);
  if (pSyncNode->rid < 0) {
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
56 57 58
    return -1;
  }

S
Shengliang Guan 已提交
59 60 61 62 63 64
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
65
  return pSyncNode->rid;
M
Minghao Li 已提交
66
}
M
Minghao Li 已提交
67

M
Minghao Li 已提交
68
void syncStart(int64_t rid) {
S
Shengliang Guan 已提交
69 70 71 72
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodeStart(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
73 74 75
  }
}

M
Minghao Li 已提交
76
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
77
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
78
  if (pSyncNode != NULL) {
S
Shengliang Guan 已提交
79
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
80
    syncNodeRemove(rid);
M
Minghao Li 已提交
81
  }
S
Shengliang Guan 已提交
82
}
M
Minghao Li 已提交
83

S
Shengliang Guan 已提交
84 85 86
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
87 88
}

S
Shengliang Guan 已提交
89
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
90
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
91
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
92

M
Minghao Li 已提交
93
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
94
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
95
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
96
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
97
    return -1;
M
Minghao Li 已提交
98
  }
99

S
Shengliang Guan 已提交
100 101
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
102

M
Minghao Li 已提交
103 104 105 106
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
107
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
108 109 110 111 112
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
    syncNodeReplicate(pSyncNode);
  }
S
Shengliang Guan 已提交
113

S
Shengliang Guan 已提交
114
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
115
  return 0;
M
Minghao Li 已提交
116
}
M
Minghao Li 已提交
117

S
Shengliang Guan 已提交
118 119 120 121
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
122
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
  if (pSyncNode == NULL) return code;

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
    SyncHeartbeat* pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
    syncHeartbeatDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
    SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
    syncHeartbeatReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
    SyncTimeout* pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
    code = syncNodeOnTimer(pSyncNode, pSyncMsg);
    syncTimeoutDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_PING) {
    SyncPing* pSyncMsg = syncPingFromRpcMsg2(pMsg);
    code = syncNodeOnPing(pSyncNode, pSyncMsg);
    syncPingDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
    SyncPingReply* pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
    code = syncNodeOnPingReply(pSyncNode, pSyncMsg);
    syncPingReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    SyncClientRequest* pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
    code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
    syncClientRequestDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
    SyncRequestVote* pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
    syncRequestVoteDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
    SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
    syncRequestVoteReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
    SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
    syncAppendEntriesDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
    SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
    syncSnapshotSendDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
    SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
    syncSnapshotRspDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
    SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
    code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
    syncLocalCmdDestroy(pSyncMsg);
  } else {
    sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
    code = -1;
M
Minghao Li 已提交
180 181
  }

S
Shengliang Guan 已提交
182
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
183
  return code;
184 185
}

S
Shengliang Guan 已提交
186
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
187
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
188
  if (pSyncNode == NULL) return -1;
189

S
Shengliang Guan 已提交
190
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
191
  syncNodeRelease(pSyncNode);
192 193 194
  return ret;
}

M
Minghao Li 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

211 212 213 214 215 216 217 218 219 220 221 222 223 224
char* syncNodePeerState2Str(const SSyncNode* pSyncNode) {
  int32_t len = 128;
  int32_t useLen = 0;
  int32_t leftLen = len - useLen;
  char*   pStr = taosMemoryMalloc(len);
  memset(pStr, 0, len);

  char*   p = pStr;
  int32_t use = snprintf(p, leftLen, "{");
  useLen += use;
  leftLen -= use;

  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    SPeerState* pState = syncNodeGetPeerState((SSyncNode*)pSyncNode, &(pSyncNode->replicasId[i]));
M
Minghao Li 已提交
225
    if (pState == NULL) {
226
      sError("vgId:%d, replica maybe dropped", pSyncNode->vgId);
M
Minghao Li 已提交
227 228
      break;
    }
229 230

    p = pStr + useLen;
S
Shengliang Guan 已提交
231
    use = snprintf(p, leftLen, "%d:%" PRId64 " ,%" PRId64, i, pState->lastSendIndex, pState->lastSendTime);
232 233 234 235 236 237 238 239 240 241 242 243 244 245
    useLen += use;
    leftLen -= use;
  }

  p = pStr + useLen;
  use = snprintf(p, leftLen, "}");
  useLen += use;
  leftLen -= use;

  // sTrace("vgId:%d, ------------------ syncNodePeerState2Str:%s", pSyncNode->vgId, pStr);

  return pStr;
}

246
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
247
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
248 249 250 251 252 253 254
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);
  int32_t code = 0;

M
Minghao Li 已提交
255
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
256 257 258
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
259 260 261
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
262 263 264
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
M
Minghao Li 已提交
265
      char logBuf[256];
S
Shengliang Guan 已提交
266 267 268
      snprintf(logBuf, sizeof(logBuf),
               "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex,
               logNum, isEmpty);
M
Minghao Li 已提交
269 270
      syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
271
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
272 273 274
      return 0;
    }

M
Minghao Li 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
    goto _DEL_WAL;

  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
              char logBuf[256];
              snprintf(logBuf, sizeof(logBuf),
S
Shengliang Guan 已提交
294 295
                       "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                       " of %s:%d, do not delete wal",
M
Minghao Li 已提交
296 297 298 299
                       lastApplyIndex, matchIndex, host, port);
              syncNodeEventLog(pSyncNode, logBuf);
            } while (0);

S
Shengliang Guan 已提交
300
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
301 302 303 304 305 306 307 308
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
          char logBuf[256];
          snprintf(logBuf, sizeof(logBuf),
S
Shengliang Guan 已提交
309 310
                   "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                   lastApplyIndex, pSyncNode->minMatchIndex);
M
Minghao Li 已提交
311 312
          syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
313
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
314 315 316 317
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
318
        char logBuf[256];
S
Shengliang Guan 已提交
319
        snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
320 321
        syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
322
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
323 324 325 326
        return 0;

      } else {
        char logBuf[256];
S
Shengliang Guan 已提交
327 328
        snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " unknown state, do not delete wal",
                 lastApplyIndex);
M
Minghao Li 已提交
329 330
        syncNodeEventLog(pSyncNode, logBuf);

S
Shengliang Guan 已提交
331
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
332 333 334 335 336 337 338 339 340
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
341 342 343
    }
  }

M
Minghao Li 已提交
344
_DEL_WAL:
345

M
Minghao Li 已提交
346
  do {
347 348 349 350
    SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

    if (snapshottingIndex == SYNC_INDEX_INVALID) {
      atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
351
      pSyncNode->snapshottingTime = taosGetTimestampMs();
352

M
Minghao Li 已提交
353 354 355
      SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
      code = walBeginSnapshot(pData->pWal, lastApplyIndex);
      if (code == 0) {
356
        char logBuf[256];
S
Shengliang Guan 已提交
357
        snprintf(logBuf, sizeof(logBuf), "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
358 359 360
                 pSyncNode->snapshottingIndex, lastApplyIndex);
        syncNodeEventLog(pSyncNode, logBuf);

M
Minghao Li 已提交
361 362
      } else {
        char logBuf[256];
S
Shengliang Guan 已提交
363 364 365
        snprintf(logBuf, sizeof(logBuf),
                 "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64, terrstr(terrno),
                 pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
366 367 368 369
        syncNodeErrorLog(pSyncNode, logBuf);

        atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
      }
370 371

    } else {
372
      char logBuf[256];
S
Shengliang Guan 已提交
373 374 375
      snprintf(logBuf, sizeof(logBuf),
               "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64, snapshottingIndex,
               lastApplyIndex);
376
      syncNodeEventLog(pSyncNode, logBuf);
377
    }
M
Minghao Li 已提交
378
  } while (0);
379

S
Shengliang Guan 已提交
380
  syncNodeRelease(pSyncNode);
381 382 383 384
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
385
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
386 387 388 389 390 391
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

392 393 394 395
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
396
    if (code != 0) {
397
      sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr());
M
Minghao Li 已提交
398

S
Shengliang Guan 已提交
399
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
400 401 402 403
      return -1;
    } else {
      do {
        char logBuf[256];
S
Shengliang Guan 已提交
404 405
        snprintf(logBuf, sizeof(logBuf), "wal snapshot end, index:%" PRId64,
                 atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
406 407
        syncNodeEventLog(pSyncNode, logBuf);
      } while (0);
408

M
Minghao Li 已提交
409 410
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
411
  }
412

S
Shengliang Guan 已提交
413
  syncNodeRelease(pSyncNode);
414 415 416
  return code;
}

M
Minghao Li 已提交
417
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
418
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
419 420 421 422 423 424 425 426
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  syncNodeStepDown(pSyncNode, newTerm);

S
Shengliang Guan 已提交
427
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
428 429 430
  return 0;
}

M
Minghao Li 已提交
431 432
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
433
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
434 435 436 437 438 439 440 441 442 443 444 445 446
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

  SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
  int32_t   ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  return ret;
}

int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  int32_t ret = 0;

  if (pSyncNode->replicaNum == 1) {
447
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
448 449 450 451
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

M
Minghao Li 已提交
452 453 454 455 456 457
  do {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

M
Minghao Li 已提交
458 459 460 461 462 463 464 465 466 467 468 469 470
  SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;
  ASSERT(pMsg != NULL);
  SRpcMsg rpcMsg = {0};
  syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
  syncLeaderTransferDestroy(pMsg);

  ret = syncNodePropose(pSyncNode, &rpcMsg, false);
  return ret;
}

471
bool syncCanLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
472
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
473 474 475
  if (pSyncNode == NULL) {
    return false;
  }
M
Minghao Li 已提交
476
  ASSERT(rid == pSyncNode->rid);
477 478

  if (pSyncNode->replicaNum == 1) {
S
Shengliang Guan 已提交
479
    syncNodeRelease(pSyncNode);
480 481 482 483
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
484
    syncNodeRelease(pSyncNode);
485 486 487 488 489 490 491 492 493 494 495 496 497 498
    return true;
  }

  bool matchOK = true;
  if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE || pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SyncIndex myCommitIndex = pSyncNode->commitIndex;
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
      if (peerMatchIndex < myCommitIndex) {
        matchOK = false;
      }
    }
  }

S
Shengliang Guan 已提交
499
  syncNodeRelease(pSyncNode);
500 501 502
  return matchOK;
}

503
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
504 505 506
  int32_t ret = syncPropose(rid, pMsg, isWeak);
  return ret;
}
M
Minghao Li 已提交
507

508 509
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
510

S
Shengliang Guan 已提交
511
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
512 513 514 515
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
516 517
  }

518
  return state;
M
Minghao Li 已提交
519 520
}

521
#if 0
522 523 524 525 526
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
527
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
528 529 530 531 532 533 534 535 536 537 538
  if (pSyncNode == NULL) {
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
    }
S
Shengliang Guan 已提交
539
    syncNodeRelease(pSyncNode);
540 541 542 543 544 545 546 547 548 549
    return -1;
  }
  ASSERT(pEntry != NULL);

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

  syncEntryDestory(pEntry);
S
Shengliang Guan 已提交
550
  syncNodeRelease(pSyncNode);
551 552 553
  return 0;
}

554
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
555
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
556 557 558
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
559
  ASSERT(rid == pSyncNode->rid);
560 561
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
562
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
563

S
Shengliang Guan 已提交
564
  syncNodeRelease(pSyncNode);
565 566 567
  return 0;
}

568
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
569
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
570 571 572
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
573
  ASSERT(rid == pSyncNode->rid);
574 575 576 577 578 579 580 581 582 583 584

  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

  for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
585
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
586
         sMeta->lastConfigIndex);
587

S
Shengliang Guan 已提交
588
  syncNodeRelease(pSyncNode);
589 590
  return 0;
}
591
#endif
592

593 594 595 596 597 598 599 600 601 602
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

  for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
603
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
604
         snapshotLastApplyIndex, lastIndex);
605 606 607 608

  return lastIndex;
}

609
#if 0
M
Minghao Li 已提交
610
SyncTerm syncGetMyTerm(int64_t rid) {
S
Shengliang Guan 已提交
611
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
612 613 614
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
M
Minghao Li 已提交
615
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
616
  SyncTerm term = pSyncNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
617

S
Shengliang Guan 已提交
618
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
619
  return term;
M
Minghao Li 已提交
620 621
}

622
SyncIndex syncGetLastIndex(int64_t rid) {
S
Shengliang Guan 已提交
623
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
624 625 626 627 628 629
  if (pSyncNode == NULL) {
    return SYNC_INDEX_INVALID;
  }
  ASSERT(rid == pSyncNode->rid);
  SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);

S
Shengliang Guan 已提交
630
  syncNodeRelease(pSyncNode);
631 632 633 634
  return lastIndex;
}

SyncIndex syncGetCommitIndex(int64_t rid) {
S
Shengliang Guan 已提交
635
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
636 637 638 639 640 641
  if (pSyncNode == NULL) {
    return SYNC_INDEX_INVALID;
  }
  ASSERT(rid == pSyncNode->rid);
  SyncIndex cmtIndex = pSyncNode->commitIndex;

S
Shengliang Guan 已提交
642
  syncNodeRelease(pSyncNode);
643 644 645
  return cmtIndex;
}

M
Minghao Li 已提交
646
SyncGroupId syncGetVgId(int64_t rid) {
S
Shengliang Guan 已提交
647
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
648
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
649 650
    return TAOS_SYNC_STATE_ERROR;
  }
M
Minghao Li 已提交
651
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
652
  SyncGroupId vgId = pSyncNode->vgId;
M
Minghao Li 已提交
653

S
Shengliang Guan 已提交
654
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
655
  return vgId;
M
Minghao Li 已提交
656 657
}

M
Minghao Li 已提交
658
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
659
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
660 661 662 663
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
M
Minghao Li 已提交
664
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
665 666
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
667 668
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
M
Minghao Li 已提交
669
    (pEpSet->numOfEps)++;
S
Shengliang Guan 已提交
670
    sInfo("vgId:%d, sync get epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
M
Minghao Li 已提交
671 672
  }
  pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
S
Shengliang Guan 已提交
673
  sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
674

S
Shengliang Guan 已提交
675
  syncNodeRelease(pSyncNode);
676
}
677
#endif
M
Minghao Li 已提交
678

679
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
680
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
681 682 683 684 685 686 687 688 689 690
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
  ASSERT(rid == pSyncNode->rid);
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    (pEpSet->numOfEps)++;
M
Minghao Li 已提交
691 692
    sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn,
          pEpSet->eps[i].port);
693
  }
M
Minghao Li 已提交
694 695 696
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
  }
M
Minghao Li 已提交
697
  sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
M
Minghao Li 已提交
698

S
Shengliang Guan 已提交
699
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
700
}
S
Shengliang Guan 已提交
701
static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) {
M
Minghao Li 已提交
702 703 704
  SRespStub stub;
  int32_t   ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
S
Shengliang Guan 已提交
705
    *pInfo = stub.rpcMsg.info;
M
Minghao Li 已提交
706 707
  }

S
Shengliang Guan 已提交
708
  sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
M
Minghao Li 已提交
709 710 711
}

char* sync2SimpleStr(int64_t rid) {
S
Shengliang Guan 已提交
712
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
713
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
714
    sTrace("syncSetRpc get pSyncNode is NULL, rid:%" PRId64, rid);
M
Minghao Li 已提交
715 716
    return NULL;
  }
M
Minghao Li 已提交
717
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
718
  char* s = syncNode2SimpleStr(pSyncNode);
S
Shengliang Guan 已提交
719
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
720 721 722 723

  return s;
}

M
Minghao Li 已提交
724
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
725
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
726
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
727
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
728 729
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
730
  }
M
Minghao Li 已提交
731
  ASSERT(rid == pSyncNode->rid);
M
Minghao Li 已提交
732

733
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
S
Shengliang Guan 已提交
734
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
735 736 737
  return ret;
}

738
static bool syncNodeBatchOK(SRpcMsg** pMsgPArr, int32_t arrSize) {
M
Minghao Li 已提交
739
  for (int32_t i = 0; i < arrSize; ++i) {
740
    if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE) {
M
Minghao Li 已提交
741 742 743
      return false;
    }

744
    if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
M
Minghao Li 已提交
745 746 747 748 749 750 751
      return false;
    }
  }

  return true;
}

752
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
753
  int32_t ret = 0;
M
Minghao Li 已提交
754

M
Minghao Li 已提交
755 756
  do {
    char eventLog[128];
S
Shengliang Guan 已提交
757
    snprintf(eventLog, sizeof(eventLog), "propose message, type:%s", TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
758 759
    syncNodeEventLog(pSyncNode, eventLog);
  } while (0);
M
Minghao Li 已提交
760

M
Minghao Li 已提交
761
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
762 763 764
    if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
      ret = -1;
      terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
S
Shengliang Guan 已提交
765
      sError("vgId:%d, failed to sync propose since not ready, type:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
766 767 768 769 770 771 772 773
      goto _END;
    }

    // config change
    if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
      if (!syncNodeCanChange(pSyncNode)) {
        ret = -1;
        terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY;
S
Shengliang Guan 已提交
774
        sError("vgId:%d, failed to sync reconfig since not ready, type:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
775 776 777 778 779 780 781
        goto _END;
      }

      ASSERT(!pSyncNode->changing);
      pSyncNode->changing = true;
    }

782 783
    // not restored, vnode enable
    if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
784 785
      ret = -1;
      terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
S
Shengliang Guan 已提交
786 787
      sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
             pSyncNode->vgId, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
788 789 790
      goto _END;
    }

M
Minghao Li 已提交
791 792 793 794 795 796
    SRespStub stub;
    stub.createTime = taosGetTimestampMs();
    stub.rpcMsg = *pMsg;
    uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);

    SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
M
Minghao Li 已提交
797 798
    SRpcMsg            rpcMsg;
    syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
799

800 801 802
    // optimized one replica
    if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
      SyncIndex retIndex;
M
Minghao Li 已提交
803
      int32_t   code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex);
804 805
      if (code == 0) {
        pMsg->info.conn.applyIndex = retIndex;
M
Minghao Li 已提交
806
        pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
807 808 809
        rpcFreeCont(rpcMsg.pCont);
        syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
        ret = 1;
810 811
        sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
               TMSG_INFO(pMsg->msgType));
812 813 814
      } else {
        ret = -1;
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
S
Shengliang Guan 已提交
815 816
        sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
               TMSG_INFO(pMsg->msgType));
817 818
      }

M
Minghao Li 已提交
819
    } else {
S
Shengliang Guan 已提交
820
      if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
821 822 823 824
        ret = 0;
      } else {
        ret = -1;
        terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
S
Shengliang Guan 已提交
825
        sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId);
826
      }
M
Minghao Li 已提交
827
    }
828

M
Minghao Li 已提交
829
    syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
830 831
    goto _END;

M
Minghao Li 已提交
832
  } else {
M
Minghao Li 已提交
833 834
    ret = -1;
    terrno = TSDB_CODE_SYN_NOT_LEADER;
835
    sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncStr(pSyncNode->state),
S
Shengliang Guan 已提交
836
           TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
837
    goto _END;
M
Minghao Li 已提交
838
  }
M
Minghao Li 已提交
839

M
Minghao Li 已提交
840
_END:
M
Minghao Li 已提交
841 842 843
  return ret;
}

844 845 846 847 848 849 850 851 852 853 854 855
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
856
  if (syncIsInit()) {
M
Minghao Li 已提交
857
    SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
858 859 860 861
    pData->pSyncNode = pSyncNode;
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
M
Minghao Li 已提交
862

863
    pSyncTimer->pData = pData;
S
Shengliang Guan 已提交
864
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
865 866 867 868 869 870 871 872 873 874 875
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
M
Minghao Li 已提交
876
  // taosMemoryFree(pSyncTimer->pData);
877 878 879
  return ret;
}

S
Shengliang Guan 已提交
880 881
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
882 883 884 885
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
886

M
Minghao Li 已提交
887 888 889 890
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
891
      goto _error;
M
Minghao Li 已提交
892
    }
893
  }
M
Minghao Li 已提交
894

S
Shengliang Guan 已提交
895
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
896
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
897
    // create a new raft config file
S
Shengliang Guan 已提交
898
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
899
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
900
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
901
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
902
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
903 904
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
905
      goto _error;
906
    }
907
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
908
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
909 910
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
911 912 913
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
914
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
915
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
916
      goto _error;
917
    }
S
Shengliang Guan 已提交
918 919

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
920 921 922 923 924 925
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
926 927 928 929
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
930 931

    raftCfgClose(pSyncNode->pRaftCfg);
932
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
933 934
  }

S
Shengliang Guan 已提交
935 936
  // init by SSyncInfo
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
937 938 939 940 941 942 943
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
944
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
945 946 947
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
948

M
Minghao Li 已提交
949
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
950
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
951 952 953
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
954

M
Minghao Li 已提交
955 956
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
957
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
958
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
959 960
    goto _error;
  }
M
Minghao Li 已提交
961

M
Minghao Li 已提交
962
  // init internal
M
Minghao Li 已提交
963
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
964
  if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
965
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
966
    goto _error;
967
  }
M
Minghao Li 已提交
968

M
Minghao Li 已提交
969
  // init peersNum, peers, peersId
M
Minghao Li 已提交
970
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
M
Minghao Li 已提交
971
  int j = 0;
M
Minghao Li 已提交
972 973 974
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
975 976 977
      j++;
    }
  }
M
Minghao Li 已提交
978
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
979
    if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
980
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
981
      goto _error;
982
    }
M
Minghao Li 已提交
983
  }
M
Minghao Li 已提交
984

M
Minghao Li 已提交
985
  // init replicaNum, replicasId
M
Minghao Li 已提交
986 987
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
H
Hongze Cheng 已提交
988
    if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
989
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
990
      goto _error;
991
    }
M
Minghao Li 已提交
992 993
  }

M
Minghao Li 已提交
994
  // init raft algorithm
M
Minghao Li 已提交
995
  pSyncNode->pFsm = pSyncInfo->pFsm;
996
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
997
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
998 999
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
1000
  // init life cycle outside
M
Minghao Li 已提交
1001

M
Minghao Li 已提交
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
1026
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
1027
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
1028
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
1029
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
1030
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
1031 1032
    goto _error;
  }
M
Minghao Li 已提交
1033

M
Minghao Li 已提交
1034
  // init TLA+ candidate vars
M
Minghao Li 已提交
1035
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
1036
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
1037
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
1038 1039
    goto _error;
  }
M
Minghao Li 已提交
1040
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
1041
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
1042
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
1043 1044
    goto _error;
  }
M
Minghao Li 已提交
1045

M
Minghao Li 已提交
1046 1047
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
1048
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
1049
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1050 1051
    goto _error;
  }
M
Minghao Li 已提交
1052
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
1053
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
1054
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
1055 1056
    goto _error;
  }
M
Minghao Li 已提交
1057 1058 1059

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
1060
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
1061
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
1062 1063
    goto _error;
  }
1064 1065 1066 1067 1068

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
1069
    if (code != 0) {
S
Shengliang Guan 已提交
1070
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
1071
      goto _error;
1072
    }
1073 1074 1075 1076 1077 1078
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
      syncNodeEventLog(pSyncNode, "reset commit index by snapshot");
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
1079

M
Minghao Li 已提交
1080 1081 1082 1083 1084
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
1085
  // init ping timer
M
Minghao Li 已提交
1086
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
1087
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
1088 1089
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
1090
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
1091
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
1092

M
Minghao Li 已提交
1093 1094
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
1095
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1096
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
1097
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
1098 1099 1100 1101
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
1102
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
1103 1104
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
1105
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
1106 1107
  pSyncNode->heartbeatTimerCounter = 0;

1108 1109 1110 1111 1112
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
1113
  // init callback
M
Minghao Li 已提交
1114 1115
  pSyncNode->FpOnPing = syncNodeOnPing;
  pSyncNode->FpOnPingReply = syncNodeOnPingReply;
M
Minghao Li 已提交
1116
  pSyncNode->FpOnClientRequest = syncNodeOnClientRequest;
1117
  pSyncNode->FpOnTimeout = syncNodeOnTimer;
M
Minghao Li 已提交
1118 1119 1120 1121 1122 1123
  pSyncNode->FpOnSnapshot = syncNodeOnSnapshot;
  pSyncNode->FpOnSnapshotReply = syncNodeOnSnapshotReply;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVote;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReply;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntries;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReply;
M
Minghao Li 已提交
1124

M
Minghao Li 已提交
1125
  // tools
M
Minghao Li 已提交
1126
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
1127
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
1128
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
1129 1130
    goto _error;
  }
M
Minghao Li 已提交
1131

1132 1133
  // restore state
  pSyncNode->restoreFinish = false;
1134

M
Minghao Li 已提交
1135 1136 1137 1138 1139 1140 1141 1142
  // snapshot senders
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
  }

  // snapshot receivers
1143
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
1144

M
Minghao Li 已提交
1145 1146 1147
  // is config changing
  pSyncNode->changing = false;

M
Minghao Li 已提交
1148 1149 1150
  // peer state
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1151 1152 1153
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1154
  // start in syncNodeStart
M
Minghao Li 已提交
1155
  // start raft
M
Minghao Li 已提交
1156
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1157

M
Minghao Li 已提交
1158 1159
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
1160
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
1161 1162
  pSyncNode->lastReplicateTime = timeNow;

1163 1164 1165
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

M
Minghao Li 已提交
1166
  syncNodeEventLog(pSyncNode, "sync open");
1167

M
Minghao Li 已提交
1168
  return pSyncNode;
1169 1170 1171

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
1172 1173
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
1174 1175 1176 1177
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
1178 1179
}

M
Minghao Li 已提交
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
    ASSERT(code == 0);
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

M
Minghao Li 已提交
1191 1192
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
1193
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
1194
    raftStoreNextTerm(pSyncNode->pRaftStore);
1195
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
1196

1197
    // Raft 3.6.2 Committing entries from previous terms
1198 1199
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
1200

M
Minghao Li 已提交
1201 1202
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
1203 1204
  }

1205 1206 1207
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
1208 1209
}

M
Minghao Li 已提交
1210 1211 1212 1213 1214 1215 1216 1217 1218
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
1219

1220 1221 1222
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
1223 1224
}

M
Minghao Li 已提交
1225
void syncNodeClose(SSyncNode* pSyncNode) {
1226 1227 1228
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
1229 1230
  int32_t ret;

M
Minghao Li 已提交
1231 1232
  syncNodeEventLog(pSyncNode, "sync close");

M
Minghao Li 已提交
1233
  ret = raftStoreClose(pSyncNode->pRaftStore);
M
Minghao Li 已提交
1234
  ASSERT(ret == 0);
M
Minghao Li 已提交
1235

M
Minghao Li 已提交
1236
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1237
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1238
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1239
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1240
  votesRespondDestory(pSyncNode->pVotesRespond);
1241
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1242
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1243
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1244
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1245
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1246
  logStoreDestory(pSyncNode->pLogStore);
1247
  pSyncNode->pLogStore = NULL;
M
Minghao Li 已提交
1248
  raftCfgClose(pSyncNode->pRaftCfg);
1249
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1250 1251 1252 1253 1254

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1255 1256 1257 1258
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1259 1260 1261 1262 1263 1264 1265
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    if ((pSyncNode->senders)[i] != NULL) {
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1266 1267 1268 1269 1270
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1271
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1272 1273
}

M
Minghao Li 已提交
1274
// option
M
Minghao Li 已提交
1275 1276
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }

M
Minghao Li 已提交
1277
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1278

M
Minghao Li 已提交
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
  syncPingLog2((char*)"==syncNodePing==", pMsg);
  int32_t ret = 0;

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);

  ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
}

int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
M
Minghao Li 已提交
1294
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId, pSyncNode->vgId);
M
Minghao Li 已提交
1295
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
1296
  ASSERT(ret == 0);
M
Minghao Li 已提交
1297 1298 1299 1300 1301 1302 1303 1304

  syncPingDestroy(pMsg);
  return ret;
}

int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
1305 1306 1307
    SRaftId*  destId = &(pSyncNode->peersId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
1308
    ASSERT(ret == 0);
M
Minghao Li 已提交
1309 1310 1311 1312 1313 1314 1315
    syncPingDestroy(pMsg);
  }
  return ret;
}

int32_t syncNodePingAll(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1316 1317 1318 1319
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    SRaftId*  destId = &(pSyncNode->replicasId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
1320
    ASSERT(ret == 0);
M
Minghao Li 已提交
1321 1322 1323 1324 1325 1326 1327 1328
    syncPingDestroy(pMsg);
  }
  return ret;
}

// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1329 1330
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1331 1332 1333
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1334
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1335
  }
M
Minghao Li 已提交
1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1349
  if (syncIsInit()) {
1350
    pSyncNode->electTimerMS = ms;
M
Minghao Li 已提交
1351 1352 1353 1354 1355 1356

    SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
    pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
    pElectTimer->pSyncNode = pSyncNode;
    pElectTimer->pData = NULL;

S
Shengliang Guan 已提交
1357
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager,
1358
                 &pSyncNode->pElectTimer);
1359

1360
  } else {
M
Minghao Li 已提交
1361
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1362
  }
M
Minghao Li 已提交
1363 1364 1365 1366 1367
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1368
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1369 1370
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1371

M
Minghao Li 已提交
1372 1373 1374 1375 1376 1377 1378 1379 1380 1381
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1382 1383
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1384 1385 1386 1387 1388 1389 1390
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1391
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1392 1393 1394 1395 1396 1397 1398 1399

  do {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine,
             2 * pSyncNode->electBaseLine, electMS);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

M
Minghao Li 已提交
1400 1401 1402
  return ret;
}

M
Minghao Li 已提交
1403
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1404
  int32_t ret = 0;
S
Shengliang Guan 已提交
1405 1406
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1407 1408 1409
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1410
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1411
  }
1412 1413 1414 1415 1416 1417 1418

  do {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

M
Minghao Li 已提交
1419 1420 1421
  return ret;
}

M
Minghao Li 已提交
1422
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1423
  int32_t ret = 0;
M
Minghao Li 已提交
1424

1425
#if 0
M
Minghao Li 已提交
1426
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1427 1428
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1429

1430 1431
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1432 1433 1434
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1435
  }
1436

M
Minghao Li 已提交
1437 1438 1439
  return ret;
}

M
Minghao Li 已提交
1440 1441
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1442 1443

#if 0
M
Minghao Li 已提交
1444 1445 1446
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1447
#endif
1448

1449 1450
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1451 1452 1453
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1454
  }
1455

M
Minghao Li 已提交
1456 1457 1458
  return ret;
}

1459 1460 1461 1462 1463 1464
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1465 1466 1467 1468
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1469
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1470 1471 1472
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1473
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1474
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1475
  } else {
M
Minghao Li 已提交
1476 1477
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
    return -1;
M
Minghao Li 已提交
1478
  }
M
Minghao Li 已提交
1479

M
Minghao Li 已提交
1480 1481 1482 1483 1484 1485
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1486
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1487 1488 1489
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1490
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1491
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1492
  } else {
M
Minghao Li 已提交
1493
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1494
  }
M
Minghao Li 已提交
1495 1496 1497
  return 0;
}

M
Minghao Li 已提交
1498
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
C
Cary Xu 已提交
1499
  char   u64buf[128] = {0};
M
Minghao Li 已提交
1500 1501
  cJSON* pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
1502 1503 1504
  if (pSyncNode != NULL) {
    // init by SSyncInfo
    cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
M
Minghao Li 已提交
1505
    cJSON_AddItemToObject(pRoot, "SRaftCfg", raftCfg2Json(pSyncNode->pRaftCfg));
M
Minghao Li 已提交
1506
    cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
M
Minghao Li 已提交
1507 1508 1509
    cJSON_AddStringToObject(pRoot, "raftStorePath", pSyncNode->raftStorePath);
    cJSON_AddStringToObject(pRoot, "configPath", pSyncNode->configPath);

M
Minghao Li 已提交
1510 1511 1512
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);

S
Shengliang Guan 已提交
1513
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
1514
    cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
S
Shengliang Guan 已提交
1515 1516
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncSendMSg);
    cJSON_AddStringToObject(pRoot, "syncSendMSg", u64buf);
M
Minghao Li 已提交
1517

S
Shengliang Guan 已提交
1518
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
1519
    cJSON_AddStringToObject(pRoot, "queue", u64buf);
S
Shengliang Guan 已提交
1520 1521
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncEqMsg);
    cJSON_AddStringToObject(pRoot, "syncEqMsg", u64buf);
M
Minghao Li 已提交
1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539

    // init internal
    cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
    cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
    cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
    cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);

    cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
    cJSON* pPeers = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
    }
    cJSON* pPeersId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
    }
M
Minghao Li 已提交
1540

M
Minghao Li 已提交
1541 1542 1543 1544 1545 1546
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
    cJSON* pReplicasId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
    }
M
Minghao Li 已提交
1547

M
Minghao Li 已提交
1548 1549 1550 1551 1552 1553 1554
    // raft algorithm
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
    cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
    cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
    cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);

M
Minghao Li 已提交
1555
    // life cycle
S
Shengliang Guan 已提交
1556
    snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->rid);
M
Minghao Li 已提交
1557 1558
    cJSON_AddStringToObject(pRoot, "rid", u64buf);

M
Minghao Li 已提交
1559 1560
    // tla+ server vars
    cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
1561
    cJSON_AddStringToObject(pRoot, "state_str", syncStr(pSyncNode->state));
M
Minghao Li 已提交
1562
    cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573

    // tla+ candidate vars
    cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
    cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));

    // tla+ leader vars
    cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
    cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));

    // tla+ log vars
    cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
S
Shengliang Guan 已提交
1574
    snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->commitIndex);
M
Minghao Li 已提交
1575 1576
    cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);

M
Minghao Li 已提交
1577 1578 1579 1580 1581
    // timer ms init
    cJSON_AddNumberToObject(pRoot, "pingBaseLine", pSyncNode->pingBaseLine);
    cJSON_AddNumberToObject(pRoot, "electBaseLine", pSyncNode->electBaseLine);
    cJSON_AddNumberToObject(pRoot, "hbBaseLine", pSyncNode->hbBaseLine);

M
Minghao Li 已提交
1582 1583 1584 1585
    // ping timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
    cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
S
Shengliang Guan 已提交
1586
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerLogicClock);
M
Minghao Li 已提交
1587
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
S
Shengliang Guan 已提交
1588
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1589 1590 1591
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
    cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
S
Shengliang Guan 已提交
1592
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerCounter);
M
Minghao Li 已提交
1593 1594 1595 1596 1597 1598
    cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);

    // elect timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
    cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
S
Shengliang Guan 已提交
1599
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClock);
M
Minghao Li 已提交
1600 1601 1602
    cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
    cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
S
Shengliang Guan 已提交
1603
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerCounter);
M
Minghao Li 已提交
1604 1605 1606 1607 1608 1609
    cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);

    // heartbeat timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
    cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
S
Shengliang Guan 已提交
1610
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerLogicClock);
M
Minghao Li 已提交
1611
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
S
Shengliang Guan 已提交
1612
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerLogicClockUser);
M
Minghao Li 已提交
1613 1614 1615
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
    cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
S
Shengliang Guan 已提交
1616
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerCounter);
M
Minghao Li 已提交
1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633
    cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);

    // callback
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
    cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
    cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
    cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
M
Minghao Li 已提交
1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646

    // restoreFinish
    cJSON_AddNumberToObject(pRoot, "restoreFinish", pSyncNode->restoreFinish);

    // snapshot senders
    cJSON* pSenders = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "senders", pSenders);
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      cJSON_AddItemToArray(pSenders, snapshotSender2Json((pSyncNode->senders)[i]));
    }

    // snapshot receivers
    cJSON* pReceivers = cJSON_CreateArray();
1647
    cJSON_AddItemToObject(pRoot, "receiver", snapshotReceiver2Json(pSyncNode->pNewNodeReceiver));
M
Minghao Li 已提交
1648 1649 1650

    // changing
    cJSON_AddNumberToObject(pRoot, "changing", pSyncNode->changing);
M
Minghao Li 已提交
1651 1652 1653 1654 1655 1656 1657
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
  return pJson;
}

M
Minghao Li 已提交
1658 1659 1660 1661 1662 1663 1664
char* syncNode2Str(const SSyncNode* pSyncNode) {
  cJSON* pJson = syncNode2Json(pSyncNode);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

1665
inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
M
Minghao Li 已提交
1666 1667 1668 1669
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1670
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
M
Minghao Li 已提交
1671
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
M
Minghao Li 已提交
1672 1673
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
  }
M
Minghao Li 已提交
1674 1675 1676 1677 1678 1679 1680

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pLogStore != NULL) {
    logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  }
M
Minghao Li 已提交
1681

M
Minghao Li 已提交
1682
  char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
1683 1684 1685 1686
  char* printStr = "";
  if (pCfgStr != NULL) {
    printStr = pCfgStr;
  }
M
Minghao Li 已提交
1687

1688 1689 1690
  char*   peerStateStr = syncNodePeerState2Str(pSyncNode);
  int32_t userStrLen = strlen(str) + strlen(peerStateStr);

M
Minghao Li 已提交
1691
  if (userStrLen < 256) {
M
Minghao Li 已提交
1692
    char logBuf[256 + 256];
1693 1694
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
1695 1696
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1697 1698 1699
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
1700
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
1701
               pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1702 1703 1704 1705
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1706
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
1707 1708 1709
    } else {
      snprintf(logBuf, sizeof(logBuf), "%s", str);
    }
1710
    // sDebug("%s", logBuf);
M
Minghao Li 已提交
1711 1712
    // sInfo("%s", logBuf);
    sTrace("%s", logBuf);
M
Minghao Li 已提交
1713

M
Minghao Li 已提交
1714
  } else {
M
Minghao Li 已提交
1715
    int   len = 256 + userStrLen;
M
Minghao Li 已提交
1716
    char* s = (char*)taosMemoryMalloc(len);
1717 1718
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(s, len,
M
Minghao Li 已提交
1719 1720
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1721 1722 1723
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
1724
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
1725
               pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1726 1727 1728 1729
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1730
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
1731 1732 1733
    } else {
      snprintf(s, len, "%s", str);
    }
1734
    // sDebug("%s", s);
M
Minghao Li 已提交
1735 1736
    // sInfo("%s", s);
    sTrace("%s", s);
M
Minghao Li 已提交
1737 1738
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
1739

M
Minghao Li 已提交
1740
  taosMemoryFree(peerStateStr);
M
Minghao Li 已提交
1741
  taosMemoryFree(pCfgStr);
M
Minghao Li 已提交
1742 1743
}

1744
inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
M
Minghao Li 已提交
1745 1746 1747 1748
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1749 1750 1751
  int32_t userStrLen = strlen(str);

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
M
Minghao Li 已提交
1752
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
M
Minghao Li 已提交
1753 1754
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
  }
M
Minghao Li 已提交
1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pLogStore != NULL) {
    logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
  }

  char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
  char* printStr = "";
  if (pCfgStr != NULL) {
    printStr = pCfgStr;
  }
M
Minghao Li 已提交
1768 1769

  if (userStrLen < 256) {
M
Minghao Li 已提交
1770
    char logBuf[256 + 256];
1771 1772
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
1773 1774
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1775 1776 1777
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
M
Minghao Li 已提交
1778
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
1779
               pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1780 1781 1782 1783
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1784
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
1785 1786 1787
    } else {
      snprintf(logBuf, sizeof(logBuf), "%s", str);
    }
M
Minghao Li 已提交
1788 1789 1790
    sError("%s", logBuf);

  } else {
M
Minghao Li 已提交
1791
    int   len = 256 + userStrLen;
M
Minghao Li 已提交
1792
    char* s = (char*)taosMemoryMalloc(len);
1793 1794
    if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
      snprintf(s, len,
M
Minghao Li 已提交
1795 1796
               "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
               ", snap:%" PRId64 ", snap-tm:%" PRIu64
M
Minghao Li 已提交
1797 1798 1799
               ", sby:%d, "
               "stgy:%d, bch:%d, "
               "r-num:%d, "
M
Minghao Li 已提交
1800
               "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
1801
               pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
M
Minghao Li 已提交
1802 1803 1804 1805
               pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
               snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
               pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
               pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
M
Minghao Li 已提交
1806
               pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
1807 1808 1809
    } else {
      snprintf(s, len, "%s", str);
    }
M
Minghao Li 已提交
1810 1811 1812
    sError("%s", s);
    taosMemoryFree(s);
  }
M
Minghao Li 已提交
1813 1814

  taosMemoryFree(pCfgStr);
M
Minghao Li 已提交
1815 1816
}

1817
inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1818 1819
  int   len = 256;
  char* s = (char*)taosMemoryMalloc(len);
M
Minghao Li 已提交
1820 1821 1822 1823 1824 1825 1826 1827

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
  SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);

M
Minghao Li 已提交
1828
  snprintf(s, len,
M
Minghao Li 已提交
1829 1830 1831 1832
           "vgId:%d, sync %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
           ", sby:%d, "
           "r-num:%d, "
           "lcfg:%" PRId64 ", chging:%d, rsto:%d",
1833 1834 1835
           pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex,
           logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
           pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish);
M
Minghao Li 已提交
1836

M
Minghao Li 已提交
1837 1838 1839
  return s;
}

1840
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866
  bool b1 = false;
  bool b2 = false;

  for (int i = 0; i < config->replicaNum; ++i) {
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

  for (int i = 0; i < config->replicaNum; ++i) {
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

  ASSERT(b1 == b2);
  return b1;
}

1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1880
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1881
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1882 1883 1884 1885
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1886

1887
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1888 1889
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

M
Minghao Li 已提交
1890 1891
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1892

M
Minghao Li 已提交
1893 1894
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1895

M
Minghao Li 已提交
1896 1897 1898 1899
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1900
  }
1901

M
Minghao Li 已提交
1902 1903 1904 1905 1906
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1907

M
Minghao Li 已提交
1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918
  // log begin config change
  do {
    char  eventLog[256];
    char* pOldCfgStr = syncCfg2SimpleStr(&oldConfig);
    char* pNewCfgStr = syncCfg2SimpleStr(pNewConfig);
    snprintf(eventLog, sizeof(eventLog), "begin do config change, from %s to %s", pOldCfgStr, pNewCfgStr);
    syncNodeEventLog(pSyncNode, eventLog);
    taosMemoryFree(pOldCfgStr);
    taosMemoryFree(pNewCfgStr);
  } while (0);

M
Minghao Li 已提交
1919 1920
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1921
  }
M
Minghao Li 已提交
1922 1923
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1924 1925
  }

M
Minghao Li 已提交
1926
  // add last config index
M
Minghao Li 已提交
1927
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1928

M
Minghao Li 已提交
1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      oldSenders[i] = (pSyncNode->senders)[i];
M
Minghao Li 已提交
1940

M
Minghao Li 已提交
1941 1942 1943 1944
      char* eventLog = snapshotSender2SimpleStr(oldSenders[i], "snapshot sender save old");
      syncNodeEventLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    }
1945

M
Minghao Li 已提交
1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
    syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
    int j = 0;
    for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
    }
1962

M
Minghao Li 已提交
1963 1964 1965 1966 1967
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
    for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
      syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
    }
1968

1969 1970 1971
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1972 1973 1974 1975
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1976

M
Minghao Li 已提交
1977
    // reset snapshot senders
1978

M
Minghao Li 已提交
1979 1980 1981 1982
    // clear new
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
1983

M
Minghao Li 已提交
1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995
    // reset new
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      // reset sender
      bool reset = false;
      for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);

          do {
            char eventLog[256];
S
Shengliang Guan 已提交
1996
            snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for: %" PRIu64 ", newIndex:%d, %s:%d, %p",
M
Minghao Li 已提交
1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016
                     (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
            syncNodeEventLog(pSyncNode, eventLog);
          } while (0);

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

          do {
            char eventLog[256];
            snprintf(eventLog, sizeof(eventLog),
                     "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex, i, host,
                     port, (pSyncNode->senders)[i], reset);
            syncNodeEventLog(pSyncNode, eventLog);
          } while (0);
        }
2017 2018
      }
    }
2019

M
Minghao Li 已提交
2020 2021 2022 2023
    // create new
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
M
Minghao Li 已提交
2024

M
Minghao Li 已提交
2025 2026 2027 2028
        char* eventLog = snapshotSender2SimpleStr((pSyncNode->senders)[i], "snapshot sender create new");
        syncNodeEventLog(pSyncNode, eventLog);
        taosMemoryFree(eventLog);
      }
2029 2030
    }

M
Minghao Li 已提交
2031 2032 2033 2034
    // free old
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      if (oldSenders[i] != NULL) {
        snapshotSenderDestroy(oldSenders[i]);
M
Minghao Li 已提交
2035

M
Minghao Li 已提交
2036 2037 2038 2039 2040
        do {
          char eventLog[128];
          snprintf(eventLog, sizeof(eventLog), "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
          syncNodeEventLog(pSyncNode, eventLog);
        } while (0);
M
Minghao Li 已提交
2041

M
Minghao Li 已提交
2042 2043
        oldSenders[i] = NULL;
      }
2044 2045
    }

2046
    // persist cfg
M
Minghao Li 已提交
2047
    raftCfgPersist(pSyncNode->pRaftCfg);
2048

M
Minghao Li 已提交
2049 2050 2051
    char  tmpbuf[512];
    char* oldStr = syncCfg2SimpleStr(&oldConfig);
    char* newStr = syncCfg2SimpleStr(pNewConfig);
2052 2053
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
M
Minghao Li 已提交
2054 2055
    taosMemoryFree(oldStr);
    taosMemoryFree(newStr);
M
Minghao Li 已提交
2056

M
Minghao Li 已提交
2057 2058 2059
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
2060 2061 2062 2063 2064

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
      syncMaybeAdvanceCommitIndex(pSyncNode);

M
Minghao Li 已提交
2065 2066 2067 2068
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
2069
    // persist cfg
M
Minghao Li 已提交
2070
    raftCfgPersist(pSyncNode->pRaftCfg);
2071

M
Minghao Li 已提交
2072 2073 2074
    char  tmpbuf[512];
    char* oldStr = syncCfg2SimpleStr(&oldConfig);
    char* newStr = syncCfg2SimpleStr(pNewConfig);
2075 2076
    snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
M
Minghao Li 已提交
2077 2078 2079
    taosMemoryFree(oldStr);
    taosMemoryFree(newStr);
    syncNodeEventLog(pSyncNode, tmpbuf);
2080
  }
2081

M
Minghao Li 已提交
2082
_END:
M
Minghao Li 已提交
2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093

  // log end config change
  do {
    char  eventLog[256];
    char* pOldCfgStr = syncCfg2SimpleStr(&oldConfig);
    char* pNewCfgStr = syncCfg2SimpleStr(pNewConfig);
    snprintf(eventLog, sizeof(eventLog), "end do config change, from %s to %s", pOldCfgStr, pNewCfgStr);
    syncNodeEventLog(pSyncNode, eventLog);
    taosMemoryFree(pOldCfgStr);
    taosMemoryFree(pNewCfgStr);
  } while (0);
M
Minghao Li 已提交
2094
  return;
M
Minghao Li 已提交
2095 2096
}

M
Minghao Li 已提交
2097 2098 2099 2100
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
2101
    char tmpBuf[64];
S
Shengliang Guan 已提交
2102
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRIu64, term);
2103
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
2104 2105 2106 2107
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

2108 2109 2110 2111 2112 2113
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
2114
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
2115 2116 2117 2118 2119 2120 2121
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
    char logBuf[128];
    snprintf(logBuf, sizeof(logBuf), "step down, ignore, new-term:%" PRIu64 ", current-term:%" PRIu64, newTerm,
             pSyncNode->pRaftStore->currentTerm);
    syncNodeEventLog(pSyncNode, logBuf);
    return;
  }
M
Minghao Li 已提交
2122 2123 2124

  do {
    char logBuf[128];
S
Shengliang Guan 已提交
2125
    snprintf(logBuf, sizeof(logBuf), "step down, new-term:%" PRIu64 ", current-term:%" PRIu64, newTerm,
M
Minghao Li 已提交
2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143
             pSyncNode->pRaftStore->currentTerm);
    syncNodeEventLog(pSyncNode, logBuf);
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRIu64, newTerm);
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

2144 2145
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

2146
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
2147
  // maybe clear leader cache
M
Minghao Li 已提交
2148 2149 2150 2151
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
2152
  // state change
M
Minghao Li 已提交
2153 2154 2155
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
2156 2157
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
2158

2159 2160 2161
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

2162 2163 2164 2165 2166
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
2167 2168 2169
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183
  // trace log
  do {
    int32_t debugStrLen = strlen(debugStr);
    if (debugStrLen < 256) {
      char eventLog[256 + 64];
      snprintf(eventLog, sizeof(eventLog), "become follower %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
    } else {
      char* eventLog = taosMemoryMalloc(debugStrLen + 64);
      snprintf(eventLog, debugStrLen, "become follower %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    }
  } while (0);
M
Minghao Li 已提交
2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
2204
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
2205 2206
  pSyncNode->leaderTime = taosGetTimestampMs();

2207 2208 2209
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
2210
  // state change
M
Minghao Li 已提交
2211
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
2212 2213

  // set leader cache
M
Minghao Li 已提交
2214 2215 2216
  pSyncNode->leaderCache = pSyncNode->myRaftId;

  for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
2217 2218
    // maybe overwrite myself, no harm
    // just do it!
2219 2220 2221 2222 2223 2224 2225 2226 2227

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
2228 2229 2230
  }

  for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
2231 2232
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
2233 2234 2235
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
2236 2237 2238
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

2239 2240
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
2241 2242 2243 2244 2245
  if (pMySender != NULL) {
    for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
2246
    }
2247
    (pMySender->privateTerm) += 100;
2248 2249
  }

2250 2251 2252 2253 2254
  // close receiver
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
2255
  // stop elect timer
M
Minghao Li 已提交
2256
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
2257

M
Minghao Li 已提交
2258 2259
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
2260

M
Minghao Li 已提交
2261 2262
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
2263

2264 2265 2266 2267 2268
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
2269 2270 2271
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285
  // trace log
  do {
    int32_t debugStrLen = strlen(debugStr);
    if (debugStrLen < 256) {
      char eventLog[256 + 64];
      snprintf(eventLog, sizeof(eventLog), "become leader %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
    } else {
      char* eventLog = taosMemoryMalloc(debugStrLen + 64);
      snprintf(eventLog, debugStrLen, "become leader %s", debugStr);
      syncNodeEventLog(pSyncNode, eventLog);
      taosMemoryFree(eventLog);
    }
  } while (0);
M
Minghao Li 已提交
2286 2287 2288
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2289 2290
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
2291
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
2292

M
Minghao Li 已提交
2293 2294
  syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);

M
Minghao Li 已提交
2295
  // Raft 3.6.2 Committing entries from previous terms
2296 2297
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
2298 2299

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
2300
    syncNodeReplicate(pSyncNode);
2301
  }
M
Minghao Li 已提交
2302 2303
}

M
Minghao Li 已提交
2304 2305
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
2306 2307 2308 2309 2310 2311 2312
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
2313 2314 2315
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2316
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
2317
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
M
Minghao Li 已提交
2318

M
Minghao Li 已提交
2319
  syncNodeEventLog(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
2320 2321 2322
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2323
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
2324
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
M
Minghao Li 已提交
2325

M
Minghao Li 已提交
2326
  syncNodeEventLog(pSyncNode, "leader to follower");
M
Minghao Li 已提交
2327 2328 2329
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2330
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
2331
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
2332

M
Minghao Li 已提交
2333
  syncNodeEventLog(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
2334 2335 2336
}

// raft vote --------------
M
Minghao Li 已提交
2337 2338 2339

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
2340
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
M
Minghao Li 已提交
2341 2342
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
2343 2344 2345 2346

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
2347
// simulate get vote from outside
M
Minghao Li 已提交
2348 2349 2350
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

M
Minghao Li 已提交
2351
  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId);
M
Minghao Li 已提交
2352 2353 2354 2355 2356 2357 2358 2359 2360 2361
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}

M
Minghao Li 已提交
2362
// snapshot --------------
M
Minghao Li 已提交
2363 2364

// return if has a snapshot
M
Minghao Li 已提交
2365 2366
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
2367
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2368 2369
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2370 2371 2372 2373 2374 2375 2376
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
2377 2378
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
2379
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
2380
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2381 2382
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2383 2384 2385 2386 2387 2388 2389
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
2390 2391
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
2392 2393
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
2394 2395
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
2396
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
2397 2398
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
2399 2400
    }

M
Minghao Li 已提交
2401 2402 2403
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
2404 2405 2406 2407
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
2408
  } else {
M
Minghao Li 已提交
2409 2410
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
2411
  }
M
Minghao Li 已提交
2412

M
Minghao Li 已提交
2413 2414 2415 2416 2417 2418 2419
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
2420 2421
  return 0;
}
M
Minghao Li 已提交
2422

M
Minghao Li 已提交
2423
// return append-entries first try index
M
Minghao Li 已提交
2424 2425 2426 2427 2428
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
2429 2430
// if index > 0, return index - 1
// else, return -1
2431 2432 2433 2434 2435 2436 2437 2438 2439
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
2440 2441 2442 2443
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

  SyncTerm        preTerm = 0;
  SyncIndex       preIndex = index - 1;
  SSyncRaftEntry* pPreEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
M
Minghao Li 已提交
2457 2458 2459 2460 2461 2462

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

2463 2464 2465 2466 2467 2468
  if (code == 0) {
    ASSERT(pPreEntry != NULL);
    preTerm = pPreEntry->term;
    taosMemoryFree(pPreEntry);
    return preTerm;
  } else {
2469 2470 2471 2472
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
2473 2474 2475 2476
      }
    }
  }

2477 2478
  do {
    char logBuf[128];
S
Shengliang Guan 已提交
2479 2480
    snprintf(logBuf, sizeof(logBuf),
             "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRIu64, index,
M
Minghao Li 已提交
2481
             snapshot.lastApplyIndex, snapshot.lastApplyTerm);
2482 2483 2484
    syncNodeErrorLog(pSyncNode, logBuf);
  } while (0);

2485 2486
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
2487 2488 2489 2490

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
2491
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
2492 2493 2494
  return 0;
}

M
Minghao Li 已提交
2495 2496 2497
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2498
  printf("syncNodePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
M
Minghao Li 已提交
2499
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
2500
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2501 2502 2503 2504
}

void syncNodePrint2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2505
  printf("syncNodePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
M
Minghao Li 已提交
2506
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
2507
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2508 2509 2510 2511
}

void syncNodeLog(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2512
  sTraceLong("syncNodeLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
2513
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2514 2515 2516
}

void syncNodeLog2(char* s, SSyncNode* pObj) {
2517 2518
  if (gRaftDetailLog) {
    char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2519
    sTraceLong("syncNodeLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
2520 2521
    taosMemoryFree(serialized);
  }
M
Minghao Li 已提交
2522 2523
}

M
Minghao Li 已提交
2524 2525
void syncNodeLog3(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
S
Shengliang Guan 已提交
2526
  sTraceLong("syncNodeLog3 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
M
Minghao Li 已提交
2527 2528 2529
  taosMemoryFree(serialized);
}

M
Minghao Li 已提交
2530
// ------ local funciton ---------
M
Minghao Li 已提交
2531
// enqueue message ----
M
Minghao Li 已提交
2532 2533
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
2534
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
2535
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
M
Minghao Li 已提交
2536
                                              pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
2537 2538
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
2539
    syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
S
Shengliang Guan 已提交
2540 2541
    if (pSyncNode->syncEqMsg != NULL) {
      int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
2542
      if (code != 0) {
S
Shengliang Guan 已提交
2543
        sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code);
2544 2545 2546 2547
        rpcFreeCont(rpcMsg.pCont);
        syncTimeoutDestroy(pSyncMsg);
        return;
      }
M
Minghao Li 已提交
2548
    } else {
S
Shengliang Guan 已提交
2549
      sTrace("syncNodeEqPingTimer pSyncNode->syncEqMsg is NULL");
M
Minghao Li 已提交
2550
    }
M
Minghao Li 已提交
2551 2552
    syncTimeoutDestroy(pSyncMsg);

S
Shengliang Guan 已提交
2553 2554
    if (syncIsInit()) {
      taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
2555 2556 2557 2558 2559
                   &pSyncNode->pPingTimer);
    } else {
      sError("sync env is stop, syncNodeEqPingTimer");
    }

M
Minghao Li 已提交
2560
  } else {
S
Shengliang Guan 已提交
2561
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64,
M
Minghao Li 已提交
2562
           pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
2563 2564 2565 2566
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
M
Minghao Li 已提交
2567 2568
  SElectTimer* pElectTimer = (SElectTimer*)param;
  SSyncNode*   pSyncNode = pElectTimer->pSyncNode;
M
Minghao Li 已提交
2569

M
Minghao Li 已提交
2570 2571
  SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pSyncNode->electTimerMS,
                                            pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
2572 2573
  SRpcMsg      rpcMsg;
  syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
2574
  if (pSyncNode->syncEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) {
S
Shengliang Guan 已提交
2575
    int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
2576 2577 2578 2579
    if (code != 0) {
      sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
      rpcFreeCont(rpcMsg.pCont);
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
2580
      taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
2581
      return;
2582
    }
M
Minghao Li 已提交
2583 2584 2585

    do {
      char logBuf[128];
M
Minghao Li 已提交
2586
      snprintf(logBuf, sizeof(logBuf), "eq elect timer lc:%" PRIu64, pSyncMsg->logicClock);
M
Minghao Li 已提交
2587
      syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
2588 2589
    } while (0);

M
Minghao Li 已提交
2590
  } else {
S
Shengliang Guan 已提交
2591
    sTrace("syncNodeEqElectTimer syncEqMsg is NULL");
M
Minghao Li 已提交
2592
  }
M
Minghao Li 已提交
2593

M
Minghao Li 已提交
2594
  syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
2595
  taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
2596

M
Minghao Li 已提交
2597
#if 0
M
Minghao Li 已提交
2598
  // reset timer ms
S
Shengliang Guan 已提交
2599
  if (syncIsInit() && pSyncNode->electBaseLine > 0) {
M
Minghao Li 已提交
2600
    pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
S
Shengliang Guan 已提交
2601
    taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager,
M
Minghao Li 已提交
2602 2603 2604
                 &pSyncNode->pElectTimer);
  } else {
    sError("sync env is stop, syncNodeEqElectTimer");
M
Minghao Li 已提交
2605
  }
M
Minghao Li 已提交
2606
#endif
M
Minghao Li 已提交
2607 2608
}

M
Minghao Li 已提交
2609 2610
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
2611 2612 2613

  syncNodeEventLog(pSyncNode, "eq hb timer");

2614 2615 2616 2617 2618 2619 2620 2621 2622
  if (pSyncNode->replicaNum > 1) {
    if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
        atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
      SyncTimeout* pSyncMsg =
          syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
                            pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
      SRpcMsg rpcMsg;
      syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
      syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
S
Shengliang Guan 已提交
2623 2624
      if (pSyncNode->syncEqMsg != NULL) {
        int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
2625
        if (code != 0) {
S
Shengliang Guan 已提交
2626
          sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
2627 2628 2629 2630 2631
          rpcFreeCont(rpcMsg.pCont);
          syncTimeoutDestroy(pSyncMsg);
          return;
        }
      } else {
S
Shengliang Guan 已提交
2632
        sError("vgId:%d, enqueue msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
2633
      }
2634
      syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
2635

S
Shengliang Guan 已提交
2636 2637
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
2638 2639 2640 2641
                     &pSyncNode->pHeartbeatTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }
2642
    } else {
2643 2644 2645
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
             "",
             pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
2646
    }
M
Minghao Li 已提交
2647 2648 2649
  }
}

2650 2651 2652 2653 2654
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
  SSyncHbTimerData* pData = (SSyncHbTimerData*)param;
  SSyncNode*        pSyncNode = pData->pSyncNode;
  SSyncTimer*       pSyncTimer = pData->pTimer;

M
Minghao Li 已提交
2655 2656 2657 2658
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return;
  }

S
Shengliang Guan 已提交
2659
  // syncNodeEventLog(pSyncNode, "eq peer hb timer");
2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670

  int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
  int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

  if (pSyncNode->replicaNum > 1) {
    if (timerLogicClock == msgLogicClock) {
      SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
      pSyncMsg->srcId = pSyncNode->myRaftId;
      pSyncMsg->destId = pData->destId;
      pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
      pSyncMsg->commitIndex = pSyncNode->commitIndex;
M
Minghao Li 已提交
2671
      pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
2672 2673 2674 2675 2676 2677 2678
      pSyncMsg->privateTerm = 0;

      SRpcMsg rpcMsg;
      syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);

// eq msg
#if 0
S
Shengliang Guan 已提交
2679 2680
      if (pSyncNode->syncEqCtrlMsg != NULL) {
        int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
2681 2682 2683 2684 2685 2686 2687
        if (code != 0) {
          sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
          rpcFreeCont(rpcMsg.pCont);
          syncHeartbeatDestroy(pSyncMsg);
          return;
        }
      } else {
S
Shengliang Guan 已提交
2688
        sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
2689 2690 2691 2692
      }
#endif

      // send msg
M
Minghao Li 已提交
2693
      syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
2694 2695 2696

      syncHeartbeatDestroy(pSyncMsg);

S
Shengliang Guan 已提交
2697 2698
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710
                     &pSyncTimer->pTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }

    } else {
      sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRIu64 ", msgLogicClock:%" PRIu64 "", timerLogicClock,
             msgLogicClock);
    }
  }
}

M
Minghao Li 已提交
2711 2712
static int32_t syncNodeEqNoop(SSyncNode* ths) {
  int32_t ret = 0;
M
Minghao Li 已提交
2713
  ASSERT(ths->state == TAOS_SYNC_STATE_LEADER);
M
Minghao Li 已提交
2714

2715
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2716
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2717
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2718
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2719 2720 2721 2722

  uint32_t           entryLen;
  char*              serialized = syncEntrySerialize(pEntry, &entryLen);
  SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen);
M
Minghao Li 已提交
2723
  ASSERT(pSyncMsg->dataLen == entryLen);
M
Minghao Li 已提交
2724 2725
  memcpy(pSyncMsg->data, serialized, entryLen);

S
Shengliang Guan 已提交
2726
  SRpcMsg rpcMsg = {0};
M
Minghao Li 已提交
2727
  syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
S
Shengliang Guan 已提交
2728 2729
  if (ths->syncEqMsg != NULL) {
    ths->syncEqMsg(ths->msgcb, &rpcMsg);
M
Minghao Li 已提交
2730
  } else {
S
Shengliang Guan 已提交
2731
    sTrace("syncNodeEqNoop pSyncNode->syncEqMsg is NULL");
M
Minghao Li 已提交
2732
  }
M
Minghao Li 已提交
2733

M
Minghao Li 已提交
2734
  syncEntryDestory(pEntry);
wafwerar's avatar
wafwerar 已提交
2735
  taosMemoryFree(serialized);
M
Minghao Li 已提交
2736 2737 2738 2739 2740
  syncClientRequestDestroy(pSyncMsg);

  return ret;
}

2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
  int       code = 0;
  int       entryLen = sizeof(*pEntry) + pEntry->dataLen;
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

M
Minghao Li 已提交
2755 2756 2757
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

2758
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2759
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
2760
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
2761
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2762

2763 2764 2765
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2766
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2767
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
2768
    if (code != 0) {
M
Minghao Li 已提交
2769
      syncNodeErrorLog(ths, "append noop error");
2770 2771
      return -1;
    }
M
Minghao Li 已提交
2772 2773
  }

2774 2775 2776 2777 2778 2779
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2780 2781 2782
  return ret;
}

M
Minghao Li 已提交
2783
// on message ----
M
Minghao Li 已提交
2784 2785 2786
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg) {
  sTrace("vgId:%d, recv sync-ping", ths->vgId);

M
Minghao Li 已提交
2787
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
M
Minghao Li 已提交
2788 2789
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
M
Minghao Li 已提交
2790 2791

  /*
M
Minghao Li 已提交
2792 2793 2794 2795 2796
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */
M
Minghao Li 已提交
2797

M
Minghao Li 已提交
2798
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
M
Minghao Li 已提交
2799
  syncPingReplyDestroy(pMsgReply);
M
Minghao Li 已提交
2800

M
Minghao Li 已提交
2801
  return 0;
M
Minghao Li 已提交
2802 2803
}

M
Minghao Li 已提交
2804
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
2805
  int32_t ret = 0;
M
Minghao Li 已提交
2806
  sTrace("vgId:%d, recv sync-ping-reply", ths->vgId);
M
Minghao Li 已提交
2807 2808
  return ret;
}
M
Minghao Li 已提交
2809

2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
  syncLogRecvHeartbeat(ths, pMsg, "");

  SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number

  SRpcMsg rpcMsg;
  syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);

M
Minghao Li 已提交
2822
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2823
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2824
    ths->minMatchIndex = pMsg->minMatchIndex;
2825 2826 2827 2828 2829 2830 2831 2832

#if 0
    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeFollowerCommit(ths, pMsg->commitIndex);
    }
#endif
  }

M
Minghao Li 已提交
2833
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2834 2835 2836 2837 2838 2839 2840 2841
    // syncNodeStepDown(ths, pMsg->term);
    SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

    SRpcMsg rpcMsgLocalCmd;
    syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);

S
Shengliang Guan 已提交
2842 2843
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2844 2845 2846 2847 2848 2849 2850 2851 2852
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRIu64, ths->vgId, pSyncMsg->sdNewTerm);
      }
    }

    syncLocalCmdDestroy(pSyncMsg);
M
Minghao Li 已提交
2853 2854
  }

2855 2856 2857 2858 2859 2860 2861 2862 2863
  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
M
Minghao Li 已提交
2864
  syncHeartbeatReplyDestroy(pMsgReply);
2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877

  return 0;
}

int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
  syncLogRecvHeartbeatReply(ths, pMsg, "");

  // update last reply time, make decision whether the other node is alive or not
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime);

  return 0;
}

M
Minghao Li 已提交
2878
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
2879 2880
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2881 2882 2883 2884 2885 2886 2887 2888 2889 2890
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

  } else {
    syncNodeErrorLog(ths, "error local cmd");
  }

  return 0;
}

M
Minghao Li 已提交
2891 2892 2893 2894 2895 2896 2897 2898 2899 2900
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2901

M
Minghao Li 已提交
2902
int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
2903 2904
  syncNodeEventLog(ths, "on client request");

M
Minghao Li 已提交
2905
  int32_t ret = 0;
2906
  int32_t code = 0;
M
Minghao Li 已提交
2907

M
Minghao Li 已提交
2908
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2909 2910
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
M
Minghao Li 已提交
2911
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
2912

2913 2914 2915
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2916
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2917 2918 2919 2920
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
      // del resp mgr, call FpCommitCb
M
Minghao Li 已提交
2921
      ASSERT(0);
2922 2923
      return -1;
    }
M
Minghao Li 已提交
2924

2925 2926
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2927
      syncNodeReplicate(ths);
2928
    }
2929

2930 2931 2932 2933
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
      syncMaybeAdvanceCommitIndex(ths);
    }
M
Minghao Li 已提交
2934 2935
  }

2936 2937 2938 2939 2940 2941 2942 2943
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2944 2945 2946 2947 2948 2949
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2950
  return ret;
2951
}
M
Minghao Li 已提交
2952

S
Shengliang Guan 已提交
2953 2954 2955
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2956
      return "follower";
S
Shengliang Guan 已提交
2957
    case TAOS_SYNC_STATE_CANDIDATE:
2958
      return "candidate";
S
Shengliang Guan 已提交
2959
    case TAOS_SYNC_STATE_LEADER:
2960
      return "leader";
S
Shengliang Guan 已提交
2961
    default:
2962
      return "error";
S
Shengliang Guan 已提交
2963
  }
M
Minghao Li 已提交
2964
}
2965

2966
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2967 2968 2969 2970
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
    syncNodeEventLog(ths, "I am not follower, can not do leader transfer");
    return 0;
  }
2971 2972 2973 2974 2975 2976

  if (!ths->restoreFinish) {
    syncNodeEventLog(ths, "restore not finish, can not do leader transfer");
    return 0;
  }

2977 2978
  if (pEntry->term < ths->pRaftStore->currentTerm) {
    char logBuf[128];
S
Shengliang Guan 已提交
2979
    snprintf(logBuf, sizeof(logBuf), "little term:%" PRIu64 ", can not do leader transfer", pEntry->term);
2980 2981 2982 2983 2984 2985
    syncNodeEventLog(ths, logBuf);
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
    char logBuf[128];
S
Shengliang Guan 已提交
2986
    snprintf(logBuf, sizeof(logBuf), "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2987
    syncNodeEventLog(ths, logBuf);
2988 2989 2990
    return 0;
  }

2991 2992 2993 2994 2995 2996 2997
  /*
    if (ths->vgId > 1) {
      syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
      return 0;
    }
  */

M
Minghao Li 已提交
2998 2999
  SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);

3000 3001
  do {
    char logBuf[128];
S
Shengliang Guan 已提交
3002
    snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%" PRId64, pEntry->index);
3003 3004
    syncNodeEventLog(ths, logBuf);
  } while (0);
M
Minghao Li 已提交
3005

M
Minghao Li 已提交
3006 3007 3008
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
3009

M
Minghao Li 已提交
3010 3011
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
3012 3013 3014 3015
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
M
Minghao Li 已提交
3016 3017

    char eventLog[256];
S
Shengliang Guan 已提交
3018
    snprintf(eventLog, sizeof(eventLog), "maybe leader transfer to %s:%d %" PRIu64,
M
Minghao Li 已提交
3019 3020 3021
             pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
             pSyncLeaderTransfer->newLeaderId.addr);
    syncNodeEventLog(ths, eventLog);
3022 3023
  }

M
Minghao Li 已提交
3024
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036
    SFsmCbMeta cbMeta = {
        cbMeta.code = 0,
        cbMeta.currentTerm = ths->pRaftStore->currentTerm,
        cbMeta.flag = 0,
        cbMeta.index = pEntry->index,
        cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        cbMeta.isWeak = pEntry->isWeak,
        cbMeta.seqNum = pEntry->seqNum,
        cbMeta.state = ths->state,
        cbMeta.term = pEntry->term,
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
3037 3038
  }

M
Minghao Li 已提交
3039
  syncLeaderTransferDestroy(pSyncLeaderTransfer);
3040 3041 3042
  return 0;
}

3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
  for (int i = 0; i < pNewCfg->replicaNum; ++i) {
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

M
Minghao Li 已提交
3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079
static int32_t syncNodeConfigChangeFinish(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
  SyncReconfigFinish* pFinish = syncReconfigFinishFromRpcMsg2(pRpcMsg);
  ASSERT(pFinish);

  if (ths->pFsm->FpReConfigCb != NULL) {
    SReConfigCbMeta cbMeta = {0};
    cbMeta.code = 0;
    cbMeta.index = pEntry->index;
    cbMeta.term = pEntry->term;
    cbMeta.seqNum = pEntry->seqNum;
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index);
    cbMeta.state = ths->state;
    cbMeta.currentTerm = ths->pRaftStore->currentTerm;
    cbMeta.isWeak = pEntry->isWeak;
    cbMeta.flag = 0;

    cbMeta.oldCfg = pFinish->oldCfg;
    cbMeta.newCfg = pFinish->newCfg;
    cbMeta.newCfgIndex = pFinish->newCfgIndex;
    cbMeta.newCfgTerm = pFinish->newCfgTerm;
    cbMeta.newCfgSeqNum = pFinish->newCfgSeqNum;

S
Shengliang Guan 已提交
3080
    ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, &cbMeta);
M
Minghao Li 已提交
3081 3082
  }

3083
  // clear changing
M
Minghao Li 已提交
3084 3085 3086 3087 3088
  ths->changing = false;

  char  tmpbuf[512];
  char* oldStr = syncCfg2SimpleStr(&(pFinish->oldCfg));
  char* newStr = syncCfg2SimpleStr(&(pFinish->newCfg));
S
Shengliang Guan 已提交
3089
  snprintf(tmpbuf, sizeof(tmpbuf), "config change finish from %d to %d, index:%" PRId64 ", %s  -->  %s",
M
Minghao Li 已提交
3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101
           pFinish->oldCfg.replicaNum, pFinish->newCfg.replicaNum, pFinish->newCfgIndex, oldStr, newStr);
  taosMemoryFree(oldStr);
  taosMemoryFree(newStr);
  syncNodeEventLog(ths, tmpbuf);

  syncReconfigFinishDestroy(pFinish);

  return 0;
}

static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry,
                                    SyncReconfigFinish* pFinish) {
3102 3103 3104
  // set changing
  ths->changing = true;

M
Minghao Li 已提交
3105
  // old config
3106 3107
  SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;

M
Minghao Li 已提交
3108
  // new config
3109 3110 3111 3112 3113
  SSyncCfg newSyncCfg;
  int32_t  ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
  ASSERT(ret == 0);

  // update new config myIndex
3114 3115
  syncNodeUpdateNewConfigIndex(ths, &newSyncCfg);

M
Minghao Li 已提交
3116 3117
  // do config change
  syncNodeDoConfigChange(ths, &newSyncCfg, pEntry->index);
3118

M
Minghao Li 已提交
3119 3120 3121 3122 3123 3124
  // set pFinish
  pFinish->oldCfg = oldSyncCfg;
  pFinish->newCfg = newSyncCfg;
  pFinish->newCfgIndex = pEntry->index;
  pFinish->newCfgTerm = pEntry->term;
  pFinish->newCfgSeqNum = pEntry->seqNum;
3125

M
Minghao Li 已提交
3126 3127
  return 0;
}
3128

M
Minghao Li 已提交
3129 3130 3131 3132 3133 3134 3135 3136 3137
static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFinish* pFinish) {
  SRpcMsg rpcMsg;
  syncReconfigFinish2RpcMsg(pFinish, &rpcMsg);

  int32_t code = syncNodePropose(ths, &rpcMsg, false);
  if (code != 0) {
    sError("syncNodeProposeConfigChangeFinish error");
    ths->changing = false;
  }
3138 3139 3140
  return 0;
}

3141 3142 3143 3144
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
3145
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
3146 3147 3148 3149
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
      char eventLog[128];
      snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex,
               snapshot.lastApplyIndex);
      syncNodeEventLog(ths, eventLog);
3163

M
Minghao Li 已提交
3164 3165 3166
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
3167 3168
  }

3169 3170
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
3171 3172

  char eventLog[128];
3173
  snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
M
Minghao Li 已提交
3174
  syncNodeEventLog(ths, eventLog);
3175 3176 3177 3178 3179 3180

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
3181 3182 3183 3184 3185 3186
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        } else {
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
M
Minghao Li 已提交
3187 3188 3189
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
          if (code != 0 || pEntry == NULL) {
M
Minghao Li 已提交
3190
            syncNodeErrorLog(ths, "get log entry error");
3191
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
3192 3193
            continue;
          }
3194
        }
3195 3196 3197 3198

        SRpcMsg rpcMsg;
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

3199
        // user commit
3200 3201
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
3202
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
3203 3204 3205 3206 3207
            internalExecute = false;
          }

          do {
            char logBuf[128];
S
Shengliang Guan 已提交
3208
            snprintf(logBuf, sizeof(logBuf), "commit index:%" PRId64 ", internal:%d", i, internalExecute);
3209 3210
            syncNodeEventLog(ths, logBuf);
          } while (0);
3211

3212 3213
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

            syncGetAndDelRespRpc(ths, cbMeta.seqNum, &rpcMsg.info);
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
3228
          }
3229 3230 3231
        }

        // config change
3232
        if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
M
Minghao Li 已提交
3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246
          SyncReconfigFinish* pFinish = syncReconfigFinishBuild(ths->vgId);
          ASSERT(pFinish != NULL);

          code = syncNodeConfigChange(ths, &rpcMsg, pEntry, pFinish);
          ASSERT(code == 0);

          if (ths->state == TAOS_SYNC_STATE_LEADER) {
            syncNodeProposeConfigChangeFinish(ths, pFinish);
          }
          syncReconfigFinishDestroy(pFinish);
        }

        // config change finish
        if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
M
Minghao Li 已提交
3247
          if (rpcMsg.pCont != NULL && rpcMsg.contLen > 0) {
M
Minghao Li 已提交
3248 3249 3250
            code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry);
            ASSERT(code == 0);
          }
3251
        }
3252

3253 3254
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
3255
        // leader transfer
3256 3257 3258
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
3259
        }
3260
#endif
3261 3262

        // restore finish
3263
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
3264 3265 3266 3267 3268 3269
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
3270

3271 3272
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;

M
Minghao Li 已提交
3273
            char eventLog[128];
S
Shengliang Guan 已提交
3274 3275
            snprintf(eventLog, sizeof(eventLog), "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms, ",
                     pEntry->index, restoreDelay);
M
Minghao Li 已提交
3276
            syncNodeEventLog(ths, eventLog);
3277 3278 3279 3280
          }
        }

        rpcFreeCont(rpcMsg.pCont);
3281 3282 3283 3284 3285
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
3286 3287 3288 3289
      }
    }
  }
  return 0;
3290 3291 3292 3293 3294 3295 3296 3297 3298
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
3299 3300 3301 3302 3303 3304 3305 3306 3307 3308
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
3309
}
M
Minghao Li 已提交
3310

3311 3312 3313 3314 3315 3316 3317 3318 3319 3320
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
3333
  if (pState == NULL) {
3334
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
3335 3336
    return false;
  }
M
Minghao Li 已提交
3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
3364
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
3365 3366 3367 3368 3369 3370
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
3371 3372
}

3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
  if (timerType == SYNC_TIMEOUT_PING) {
    return "ping";
  } else if (timerType == SYNC_TIMEOUT_ELECTION) {
    return "elect";
  } else if (timerType == SYNC_TIMEOUT_HEARTBEAT) {
    return "heartbeat";
  } else {
    return "unknown";
  }
}

void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
  char logBuf[256];
S
Shengliang Guan 已提交
3387
  snprintf(logBuf, sizeof(logBuf), "recv sync-timer {type:%s, lc:%" PRIu64 ", ms:%d, data:%p}, %s",
M
Minghao Li 已提交
3388
           syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
3389 3390 3391
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3392
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
M
Minghao Li 已提交
3393 3394 3395 3396 3397 3398 3399 3400 3401 3402
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host, port,
           pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3403
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
M
Minghao Li 已提交
3404 3405 3406 3407 3408 3409 3410 3411 3412 3413
  char     logBuf[256];
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  snprintf(logBuf, sizeof(logBuf),
           "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host,
           port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3414
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
M
Minghao Li 已提交
3415 3416 3417 3418 3419 3420 3421 3422 3423
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "send sync-request-vote-reply to %s:%d {term:%" PRIu64 ", grant:%d}, %s", host, port,
           pMsg->term, pMsg->voteGranted, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3424
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
M
Minghao Li 已提交
3425 3426 3427 3428 3429 3430 3431 3432 3433
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, %s", host,
           port, pMsg->term, pMsg->voteGranted, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3434
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
M
Minghao Li 已提交
3435 3436 3437 3438 3439 3440
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3441
           ", pterm:%" PRIu64 ", cmt:%" PRId64
M
Minghao Li 已提交
3442 3443 3444 3445 3446 3447 3448
           ", "
           "datalen:%d}, %s",
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
           pMsg->dataLen, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3449
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
M
Minghao Li 已提交
3450 3451 3452 3453 3454
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3455
           "recv sync-append-entries from %s:%d {term:%" PRIu64 ", pre-index:%" PRIu64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3456
           ", cmt:%" PRIu64 ", pterm:%" PRIu64
M
Minghao Li 已提交
3457 3458 3459 3460
           ", "
           "datalen:%d}, %s",
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
           pMsg->dataLen, s);
M
Minghao Li 已提交
3461
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3462 3463
}

wafwerar's avatar
wafwerar 已提交
3464
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
M
Minghao Li 已提交
3465 3466 3467 3468 3469 3470
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3471
           ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
M
Minghao Li 已提交
3472 3473
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
           pMsg->dataLen, pMsg->dataCount, s);
M
Minghao Li 已提交
3474 3475 3476
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3477
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
M
Minghao Li 已提交
3478 3479 3480 3481 3482 3483
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
M
Minghao Li 已提交
3484
           ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
M
Minghao Li 已提交
3485 3486
           host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
           pMsg->dataLen, pMsg->dataCount, s);
M
Minghao Li 已提交
3487
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3488 3489
}

3490
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
           "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

wafwerar's avatar
wafwerar 已提交
3502
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
M
Minghao Li 已提交
3503 3504 3505 3506 3507
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3508
           "recv sync-append-entries-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
M
Minghao Li 已提交
3509 3510
           "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
M
Minghao Li 已提交
3511
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3512
}
3513 3514 3515 3516 3517 3518 3519

void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3520
           "send sync-heartbeat to %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64
M
Minghao Li 已提交
3521 3522
           "}, %s",
           host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
3523 3524 3525 3526 3527 3528 3529 3530 3531
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
M
Minghao Li 已提交
3532 3533 3534
           "recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64
           "}, %s",
           host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "send sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
}

void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
           host, port, pMsg->term, pMsg->privateTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
M
Minghao Li 已提交
3556
}
3557 3558 3559 3560 3561 3562

void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd,
           syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s);
  syncNodeEventLog(pSyncNode, logBuf);
S
Shengliang Guan 已提交
3563
}