syncMain.c 79.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
M
Minghao Li 已提交
26
#include "syncRaftCfg.h"
M
Minghao Li 已提交
27
#include "syncRaftLog.h"
M
Minghao Li 已提交
28
#include "syncRaftStore.h"
M
Minghao Li 已提交
29
#include "syncReplication.h"
M
Minghao Li 已提交
30 31
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
32
#include "syncRespMgr.h"
M
Minghao Li 已提交
33
#include "syncSnapshot.h"
M
Minghao Li 已提交
34
#include "syncTimeout.h"
M
Minghao Li 已提交
35
#include "syncUtil.h"
M
Minghao Li 已提交
36
#include "syncVoteMgr.h"
M
Minghao Li 已提交
37

M
Minghao Li 已提交
38 39 40 41 42
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
43
static void    syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
S
Shengliang Guan 已提交
44
static bool    syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
S
Shengliang Guan 已提交
45 46 47
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
M
Minghao Li 已提交
48

49
int64_t syncOpen(SSyncInfo* pSyncInfo) {
S
Shengliang Guan 已提交
50 51
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
52
    sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
53 54
    return -1;
  }
M
Minghao Li 已提交
55

S
Shengliang Guan 已提交
56 57 58
  pSyncNode->rid = syncNodeAdd(pSyncNode);
  if (pSyncNode->rid < 0) {
    syncNodeClose(pSyncNode);
M
Minghao Li 已提交
59 60 61
    return -1;
  }

S
Shengliang Guan 已提交
62 63 64 65 66 67
  pSyncNode->pingBaseLine = pSyncInfo->pingMs;
  pSyncNode->pingTimerMS = pSyncInfo->pingMs;
  pSyncNode->electBaseLine = pSyncInfo->electMs;
  pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
  pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
68
  return pSyncNode->rid;
M
Minghao Li 已提交
69
}
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71
void syncStart(int64_t rid) {
S
Shengliang Guan 已提交
72 73 74 75
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode != NULL) {
    syncNodeStart(pSyncNode);
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
76 77 78
  }
}

M
Minghao Li 已提交
79
void syncStop(int64_t rid) {
S
Shengliang Guan 已提交
80
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
81
  if (pSyncNode != NULL) {
S
Shengliang Guan 已提交
82
    syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
83
    syncNodeRemove(rid);
M
Minghao Li 已提交
84
  }
S
Shengliang Guan 已提交
85
}
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87 88
void syncPreStop(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
89 90 91 92
  if (pSyncNode != NULL) {
    syncNodePreClose(pSyncNode);
    syncNodeRelease(pSyncNode);
  }
M
Minghao Li 已提交
93 94
}

S
Shengliang Guan 已提交
95 96 97
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
  if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
  return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
M
Minghao Li 已提交
98 99
}

S
Shengliang Guan 已提交
100
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
101
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
102
  if (pSyncNode == NULL) return -1;
M
Minghao Li 已提交
103

M
Minghao Li 已提交
104
  if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
S
Shengliang Guan 已提交
105
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
106
    terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
S
Shengliang Guan 已提交
107
    sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
M
Minghao Li 已提交
108
    return -1;
M
Minghao Li 已提交
109
  }
110

S
Shengliang Guan 已提交
111 112
  syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
  syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
S
Shengliang Guan 已提交
113

M
Minghao Li 已提交
114 115 116 117
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    syncNodeStopHeartbeatTimer(pSyncNode);

    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
S
Shengliang Guan 已提交
118
      syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
M
Minghao Li 已提交
119 120 121 122 123
    }

    syncNodeStartHeartbeatTimer(pSyncNode);
    syncNodeReplicate(pSyncNode);
  }
S
Shengliang Guan 已提交
124

S
Shengliang Guan 已提交
125
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
126
  return 0;
M
Minghao Li 已提交
127
}
M
Minghao Li 已提交
128

S
Shengliang Guan 已提交
129 130 131 132
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
  int32_t code = -1;
  if (!syncIsInit()) return code;

S
Shengliang Guan 已提交
133
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
134 135 136
  if (pSyncNode == NULL) return code;

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
S
Shengliang Guan 已提交
137
    code = syncNodeOnHeartbeat(pSyncNode, pMsg);
S
Shengliang Guan 已提交
138
  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
139
    code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
S
Shengliang Guan 已提交
140
  } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
S
Shengliang Guan 已提交
141
    code = syncNodeOnTimeout(pSyncNode, pMsg);
S
Shengliang Guan 已提交
142
  } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
143
    code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
S
Shengliang Guan 已提交
144
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
145
    code = syncNodeOnRequestVote(pSyncNode, pMsg);
S
Shengliang Guan 已提交
146
  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
S
Shengliang Guan 已提交
147
    code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
S
Shengliang Guan 已提交
148
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
149
    code = syncNodeOnAppendEntries(pSyncNode, pMsg);
S
Shengliang Guan 已提交
150
  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
151
    code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
S
Shengliang Guan 已提交
152
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
153
    code = syncNodeOnSnapshot(pSyncNode, pMsg);
S
Shengliang Guan 已提交
154
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
155
    code = syncNodeOnSnapshotReply(pSyncNode, pMsg);
S
Shengliang Guan 已提交
156
  } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
S
Shengliang Guan 已提交
157
    code = syncNodeOnLocalCmd(pSyncNode, pMsg);
S
Shengliang Guan 已提交
158 159 160
  } else {
    sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
    code = -1;
M
Minghao Li 已提交
161 162
  }

S
Shengliang Guan 已提交
163
  syncNodeRelease(pSyncNode);
S
Shengliang Guan 已提交
164
  return code;
165 166
}

S
Shengliang Guan 已提交
167
int32_t syncLeaderTransfer(int64_t rid) {
S
Shengliang Guan 已提交
168
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
169
  if (pSyncNode == NULL) return -1;
170

S
Shengliang Guan 已提交
171
  int32_t ret = syncNodeLeaderTransfer(pSyncNode);
S
Shengliang Guan 已提交
172
  syncNodeRelease(pSyncNode);
173 174 175
  return ret;
}

M
Minghao Li 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
  SyncIndex minMatchIndex = SYNC_INDEX_INVALID;

  if (pSyncNode->peersNum > 0) {
    minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
  }

  for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
    SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
    if (matchIndex < minMatchIndex) {
      minMatchIndex = matchIndex;
    }
  }
  return minMatchIndex;
}

192
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
S
Shengliang Guan 已提交
193
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
194
  if (pSyncNode == NULL) {
195
    sError("sync begin snapshot error");
196 197
    return -1;
  }
198

199 200
  int32_t code = 0;

M
Minghao Li 已提交
201
  if (syncNodeIsMnode(pSyncNode)) {
M
Minghao Li 已提交
202 203 204
    // mnode
    int64_t logRetention = SYNC_MNODE_LOG_RETENTION;

M
Minghao Li 已提交
205 206 207
    SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
    SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
    int64_t   logNum = endIndex - beginIndex;
M
Minghao Li 已提交
208 209 210
    bool      isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);

    if (isEmpty || (!isEmpty && logNum < logRetention)) {
S
Shengliang Guan 已提交
211 212
      sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
              lastApplyIndex, logNum, isEmpty);
S
Shengliang Guan 已提交
213
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
214 215 216
      return 0;
    }

M
Minghao Li 已提交
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
    goto _DEL_WAL;

  } else {
    // vnode
    if (pSyncNode->replicaNum > 1) {
      // multi replicas

      if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
        pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);

        for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
          int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
          if (lastApplyIndex > matchIndex) {
            do {
              char     host[64];
              uint16_t port;
              syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
234 235 236 237
              sNTrace(pSyncNode,
                      "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
                      " of %s:%d, do not delete wal",
                      lastApplyIndex, matchIndex, host, port);
M
Minghao Li 已提交
238 239
            } while (0);

S
Shengliang Guan 已提交
240
            syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
241 242 243 244 245 246
            return 0;
          }
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
        if (lastApplyIndex > pSyncNode->minMatchIndex) {
S
Shengliang Guan 已提交
247 248 249
          sNTrace(pSyncNode,
                  "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
                  lastApplyIndex, pSyncNode->minMatchIndex);
S
Shengliang Guan 已提交
250
          syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
251 252 253 254
          return 0;
        }

      } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
S
Shengliang Guan 已提交
255
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
256
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
257 258 259
        return 0;

      } else {
S
Shengliang Guan 已提交
260
        sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
S
Shengliang Guan 已提交
261
        syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
262 263 264 265 266 267 268 269 270
        return 0;
      }

      goto _DEL_WAL;

    } else {
      // one replica

      goto _DEL_WAL;
271 272 273
    }
  }

M
Minghao Li 已提交
274
_DEL_WAL:
275

M
Minghao Li 已提交
276
  do {
277 278 279 280
    SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);

    if (snapshottingIndex == SYNC_INDEX_INVALID) {
      atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
281
      pSyncNode->snapshottingTime = taosGetTimestampMs();
282

M
Minghao Li 已提交
283 284 285
      SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
      code = walBeginSnapshot(pData->pWal, lastApplyIndex);
      if (code == 0) {
S
Shengliang Guan 已提交
286 287
        sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
                pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
288
      } else {
S
Shengliang Guan 已提交
289
        sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
S
Shengliang Guan 已提交
290
                terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
M
Minghao Li 已提交
291 292
        atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
      }
293 294

    } else {
S
Shengliang Guan 已提交
295 296
      sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
              snapshottingIndex, lastApplyIndex);
297
    }
M
Minghao Li 已提交
298
  } while (0);
299

S
Shengliang Guan 已提交
300
  syncNodeRelease(pSyncNode);
301 302 303 304
  return code;
}

int32_t syncEndSnapshot(int64_t rid) {
S
Shengliang Guan 已提交
305
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
306
  if (pSyncNode == NULL) {
307
    sError("sync end snapshot error");
308 309 310
    return -1;
  }

311 312 313 314
  int32_t code = 0;
  if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
    SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
    code = walEndSnapshot(pData->pWal);
M
Minghao Li 已提交
315
    if (code != 0) {
316
      sNError(pSyncNode, "wal snapshot end error since:%s", terrstr());
S
Shengliang Guan 已提交
317
      syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
318 319
      return -1;
    } else {
S
Shengliang Guan 已提交
320
      sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex));
M
Minghao Li 已提交
321 322
      atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
    }
323
  }
324

S
Shengliang Guan 已提交
325
  syncNodeRelease(pSyncNode);
326 327 328
  return code;
}

M
Minghao Li 已提交
329
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
S
Shengliang Guan 已提交
330
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
M
Minghao Li 已提交
331
  if (pSyncNode == NULL) {
332
    sError("sync step down error");
M
Minghao Li 已提交
333 334 335 336
    return -1;
  }

  syncNodeStepDown(pSyncNode, newTerm);
S
Shengliang Guan 已提交
337
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
338 339 340
  return 0;
}

341 342 343
bool syncIsReadyForRead(int64_t rid) {
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
  if (pSyncNode == NULL) {
344
    sError("sync ready for read error");
345 346 347 348 349 350 351 352 353 354
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) {
    syncNodeRelease(pSyncNode);
    return true;
  }

  bool ready = false;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
355 356 357
    if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
      // apply queue not empty
      ready = false;
358

359 360 361 362
    } else {
      if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
        SSyncRaftEntry* pEntry = NULL;
        int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(
S
Shengliang Guan 已提交
363
                    pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
364 365 366 367 368 369 370
        if (code == 0 && pEntry != NULL) {
          if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
            ready = true;
          }

          syncEntryDestory(pEntry);
        }
371 372 373 374
      }
    }
  }

375 376 377 378 379 380 381 382
  if (!ready) {
    if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
      terrno = TSDB_CODE_SYN_NOT_LEADER;
    } else {
      terrno = TSDB_CODE_APP_NOT_READY;
    }
  }

383 384 385 386
  syncNodeRelease(pSyncNode);
  return ready;
}

M
Minghao Li 已提交
387 388
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
  if (pSyncNode->peersNum == 0) {
389
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
390 391 392 393
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

394 395 396 397 398 399
  int32_t ret = 0;
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
    ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
  }

M
Minghao Li 已提交
400 401 402 403 404
  return ret;
}

int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
  if (pSyncNode->replicaNum == 1) {
405
    sDebug("only one replica, cannot leader transfer");
M
Minghao Li 已提交
406 407 408 409
    terrno = TSDB_CODE_SYN_ONE_REPLICA;
    return -1;
  }

S
Shengliang Guan 已提交
410
  sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
M
Minghao Li 已提交
411

412 413 414 415
  SRpcMsg rpcMsg = {0};
  (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId);

  SyncLeaderTransfer* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
416 417 418 419
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  pMsg->newNodeInfo = newLeader;

420
  return syncNodePropose(pSyncNode, &rpcMsg, false);
M
Minghao Li 已提交
421 422
}

423 424
SSyncState syncGetState(int64_t rid) {
  SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
M
Minghao Li 已提交
425

S
Shengliang Guan 已提交
426
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
427 428 429 430
  if (pSyncNode != NULL) {
    state.state = pSyncNode->state;
    state.restored = pSyncNode->restoreFinish;
    syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
431 432
  }

433
  return state;
M
Minghao Li 已提交
434 435
}

436
#if 0
437 438 439 440 441
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
  if (index < SYNC_INDEX_BEGIN) {
    return -1;
  }

S
Shengliang Guan 已提交
442
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
443 444 445 446 447 448 449 450 451 452 453
  if (pSyncNode == NULL) {
    return -1;
  }
  ASSERT(rid == pSyncNode->rid);

  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
  if (code != 0) {
    if (pEntry != NULL) {
      syncEntryDestory(pEntry);
    }
S
Shengliang Guan 已提交
454
    syncNodeRelease(pSyncNode);
455 456 457 458 459 460 461 462 463 464
    return -1;
  }
  ASSERT(pEntry != NULL);

  pSnapshot->data = NULL;
  pSnapshot->lastApplyIndex = index;
  pSnapshot->lastApplyTerm = pEntry->term;
  pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);

  syncEntryDestory(pEntry);
S
Shengliang Guan 已提交
465
  syncNodeRelease(pSyncNode);
466 467 468
  return 0;
}

469
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
470
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
471 472 473
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
474
  ASSERT(rid == pSyncNode->rid);
475 476
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

S
Shengliang Guan 已提交
477
  sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
478

S
Shengliang Guan 已提交
479
  syncNodeRelease(pSyncNode);
480 481 482
  return 0;
}

483
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
S
Shengliang Guan 已提交
484
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
485 486 487
  if (pSyncNode == NULL) {
    return -1;
  }
M
Minghao Li 已提交
488
  ASSERT(rid == pSyncNode->rid);
489 490 491 492

  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
493
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
494 495 496 497 498 499
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
  sMeta->lastConfigIndex = lastIndex;
500
  sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
S
Shengliang Guan 已提交
501
         sMeta->lastConfigIndex);
502

S
Shengliang Guan 已提交
503
  syncNodeRelease(pSyncNode);
504 505
  return 0;
}
506
#endif
507

508 509 510 511
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
  ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
  SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];

S
Shengliang Guan 已提交
512
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
513 514 515 516 517
    if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
        (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
      lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
    }
  }
S
Shengliang Guan 已提交
518
  sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
S
Shengliang Guan 已提交
519
         snapshotLastApplyIndex, lastIndex);
520 521 522 523

  return lastIndex;
}

S
Shengliang Guan 已提交
524
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
M
Minghao Li 已提交
525
  pEpSet->numOfEps = 0;
526

S
Shengliang Guan 已提交
527
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
S
Shengliang Guan 已提交
528
  if (pSyncNode == NULL) return;
529

S
Shengliang Guan 已提交
530
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
S
Shengliang Guan 已提交
531 532 533 534 535
    SEp* pEp = &pEpSet->eps[i];
    tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
    pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
    pEpSet->numOfEps++;
    sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
536
  }
M
Minghao Li 已提交
537 538 539
  if (pEpSet->numOfEps > 0) {
    pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
  }
M
Minghao Li 已提交
540

S
Shengliang Guan 已提交
541
  sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
542
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
543
}
544

M
Minghao Li 已提交
545
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
546
  SSyncNode* pSyncNode = syncNodeAcquire(rid);
547
  if (pSyncNode == NULL) {
548
    sError("sync propose error");
M
Minghao Li 已提交
549
    return -1;
550
  }
M
Minghao Li 已提交
551

552
  int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
S
Shengliang Guan 已提交
553
  syncNodeRelease(pSyncNode);
M
Minghao Li 已提交
554 555 556
  return ret;
}

557
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
S
Shengliang Guan 已提交
558 559 560 561 562
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
    return -1;
  }
M
Minghao Li 已提交
563

S
Shengliang Guan 已提交
564 565 566 567 568 569 570 571 572 573 574
  // not restored, vnode enable
  if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
    terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
    sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
            TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
    return -1;
  }

  // optimized one replica
  if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
    SyncIndex retIndex;
575
    int32_t   code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
S
Shengliang Guan 已提交
576 577 578
    if (code == 0) {
      pMsg->info.conn.applyIndex = retIndex;
      pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
579 580 581
      sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
             TMSG_INFO(pMsg->msgType));
      return 1;
S
Shengliang Guan 已提交
582 583
    } else {
      terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
584
      sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
S
Shengliang Guan 已提交
585
             TMSG_INFO(pMsg->msgType));
586
      return -1;
587
    }
S
Shengliang Guan 已提交
588
  } else {
S
Shengliang Guan 已提交
589 590
    SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
    uint64_t  seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
591
    SRpcMsg   rpcMsg = {0};
S
Shengliang Guan 已提交
592
    int32_t   code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
593 594 595 596 597
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
      return -1;
    }
M
Minghao Li 已提交
598

599 600 601 602 603
    sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
    code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
    if (code != 0) {
      sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
      (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
M
Minghao Li 已提交
604
    }
M
Minghao Li 已提交
605

606 607
    return code;
  }
M
Minghao Li 已提交
608 609
}

S
Shengliang Guan 已提交
610
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) {
611 612 613 614 615 616 617 618 619
  pSyncTimer->pTimer = NULL;
  pSyncTimer->counter = 0;
  pSyncTimer->timerMS = pSyncNode->hbBaseLine;
  pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer;
  pSyncTimer->destId = destId;
  atomic_store_64(&pSyncTimer->logicClock, 0);
  return 0;
}

S
Shengliang Guan 已提交
620
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
621
  int32_t ret = 0;
S
Shengliang Guan 已提交
622
  if (syncIsInit()) {
M
Minghao Li 已提交
623
    SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
624 625 626 627
    pData->pSyncNode = pSyncNode;
    pData->pTimer = pSyncTimer;
    pData->destId = pSyncTimer->destId;
    pData->logicClock = pSyncTimer->logicClock;
M
Minghao Li 已提交
628

629
    pSyncTimer->pData = pData;
S
Shengliang Guan 已提交
630
    taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
631 632 633 634 635 636
  } else {
    sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
  }
  return ret;
}

S
Shengliang Guan 已提交
637
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
638 639 640 641
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
  taosTmrStop(pSyncTimer->pTimer);
  pSyncTimer->pTimer = NULL;
M
Minghao Li 已提交
642
  // taosMemoryFree(pSyncTimer->pData);
643 644 645
  return ret;
}

S
Shengliang Guan 已提交
646 647
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
648 649 650 651
  if (pSyncNode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }
M
Minghao Li 已提交
652

M
Minghao Li 已提交
653 654 655 656
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
657
      goto _error;
M
Minghao Li 已提交
658
    }
659
  }
M
Minghao Li 已提交
660

S
Shengliang Guan 已提交
661
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
662
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
663
    // create a new raft config file
S
Shengliang Guan 已提交
664
    SRaftCfgMeta meta = {0};
M
Minghao Li 已提交
665
    meta.isStandBy = pSyncInfo->isStandBy;
M
Minghao Li 已提交
666
    meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
667
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
668
    meta.batchSize = pSyncInfo->batchSize;
S
Shengliang Guan 已提交
669 670
    if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
      sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
671
      goto _error;
672
    }
673
    if (pSyncInfo->syncCfg.replicaNum == 0) {
S
Shengliang Guan 已提交
674
      sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
675 676
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
677 678 679
  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
680
    if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
681
      sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
H
Hongze Cheng 已提交
682
      goto _error;
683
    }
S
Shengliang Guan 已提交
684 685

    if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
S
Shengliang Guan 已提交
686 687 688 689 690 691
      sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
      pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
      if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
        sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
        goto _error;
      }
S
Shengliang Guan 已提交
692 693 694 695
    } else {
      sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
      pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
    }
696 697

    raftCfgClose(pSyncNode->pRaftCfg);
698
    pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
699 700
  }

S
Shengliang Guan 已提交
701 702
  // init by SSyncInfo
  pSyncNode->vgId = pSyncInfo->vgId;
S
Shengliang Guan 已提交
703 704 705 706 707 708 709
  SSyncCfg* pCfg = &pSyncInfo->syncCfg;
  sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo* pNode = &pCfg->nodeInfo[i];
    sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

M
Minghao Li 已提交
710
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
S
Shengliang Guan 已提交
711 712 713
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
           TD_DIRSEP);
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
M
Minghao Li 已提交
714

M
Minghao Li 已提交
715
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
716
  pSyncNode->msgcb = pSyncInfo->msgcb;
S
Shengliang Guan 已提交
717 718 719
  pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
  pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
  pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
M
Minghao Li 已提交
720

M
Minghao Li 已提交
721 722
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
723
  if (pSyncNode->pRaftCfg == NULL) {
S
Shengliang Guan 已提交
724
    sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
725 726
    goto _error;
  }
M
Minghao Li 已提交
727

M
Minghao Li 已提交
728
  // init internal
M
Minghao Li 已提交
729
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
730
  if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
S
Shengliang Guan 已提交
731
    sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
H
Hongze Cheng 已提交
732
    goto _error;
733
  }
M
Minghao Li 已提交
734

M
Minghao Li 已提交
735
  // init peersNum, peers, peersId
M
Minghao Li 已提交
736
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
737 738
  int32_t j = 0;
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
739 740
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
741 742 743
      j++;
    }
  }
S
Shengliang Guan 已提交
744
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
745
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
S
Shengliang Guan 已提交
746
      sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
747
      goto _error;
748
    }
M
Minghao Li 已提交
749
  }
M
Minghao Li 已提交
750

M
Minghao Li 已提交
751
  // init replicaNum, replicasId
M
Minghao Li 已提交
752
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
753
  for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
754
    if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
S
Shengliang Guan 已提交
755
      sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
H
Hongze Cheng 已提交
756
      goto _error;
757
    }
M
Minghao Li 已提交
758 759
  }

M
Minghao Li 已提交
760
  // init raft algorithm
M
Minghao Li 已提交
761
  pSyncNode->pFsm = pSyncInfo->pFsm;
762
  pSyncInfo->pFsm = NULL;
M
Minghao Li 已提交
763
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
764 765
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
766
  // init life cycle outside
M
Minghao Li 已提交
767

M
Minghao Li 已提交
768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
792
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
793
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
794
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
795
  if (pSyncNode->pRaftStore == NULL) {
S
Shengliang Guan 已提交
796
    sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
797 798
    goto _error;
  }
M
Minghao Li 已提交
799

M
Minghao Li 已提交
800
  // init TLA+ candidate vars
M
Minghao Li 已提交
801
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
802
  if (pSyncNode->pVotesGranted == NULL) {
S
Shengliang Guan 已提交
803
    sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
804 805
    goto _error;
  }
M
Minghao Li 已提交
806
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
807
  if (pSyncNode->pVotesRespond == NULL) {
S
Shengliang Guan 已提交
808
    sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
809 810
    goto _error;
  }
M
Minghao Li 已提交
811

M
Minghao Li 已提交
812 813
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
814
  if (pSyncNode->pNextIndex == NULL) {
S
Shengliang Guan 已提交
815
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
816 817
    goto _error;
  }
M
Minghao Li 已提交
818
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
819
  if (pSyncNode->pMatchIndex == NULL) {
S
Shengliang Guan 已提交
820
    sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
821 822
    goto _error;
  }
M
Minghao Li 已提交
823 824 825

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
826
  if (pSyncNode->pLogStore == NULL) {
S
Shengliang Guan 已提交
827
    sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
828 829
    goto _error;
  }
830 831 832 833 834

  SyncIndex commitIndex = SYNC_INDEX_INVALID;
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot = {0};
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
835
    if (code != 0) {
S
Shengliang Guan 已提交
836
      sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
H
Hongze Cheng 已提交
837
      goto _error;
838
    }
839 840
    if (snapshot.lastApplyIndex > commitIndex) {
      commitIndex = snapshot.lastApplyIndex;
S
Shengliang Guan 已提交
841
      sNTrace(pSyncNode, "reset commit index by snapshot");
842 843 844
    }
  }
  pSyncNode->commitIndex = commitIndex;
M
Minghao Li 已提交
845

M
Minghao Li 已提交
846 847 848 849 850
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
851
  // init ping timer
M
Minghao Li 已提交
852
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
853
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
854 855
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
856
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
857
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
858

M
Minghao Li 已提交
859 860
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
861
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
862
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
M
Minghao Li 已提交
863
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
864 865 866 867
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
868
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
869 870
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
871
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
872 873
  pSyncNode->heartbeatTimerCounter = 0;

874 875 876 877 878
  // init peer heartbeat timer
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
    syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
  }

M
Minghao Li 已提交
879
  // tools
M
Minghao Li 已提交
880
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
881
  if (pSyncNode->pSyncRespMgr == NULL) {
S
Shengliang Guan 已提交
882
    sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
883 884
    goto _error;
  }
M
Minghao Li 已提交
885

886 887
  // restore state
  pSyncNode->restoreFinish = false;
888

M
Minghao Li 已提交
889
  // snapshot senders
S
Shengliang Guan 已提交
890
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
891 892 893 894 895 896
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
  }

  // snapshot receivers
897
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
898

M
Minghao Li 已提交
899 900 901
  // is config changing
  pSyncNode->changing = false;

M
Minghao Li 已提交
902 903 904
  // peer state
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
905 906 907
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
908
  // start in syncNodeStart
M
Minghao Li 已提交
909
  // start raft
M
Minghao Li 已提交
910
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
911

M
Minghao Li 已提交
912 913
  int64_t timeNow = taosGetTimestampMs();
  pSyncNode->startTime = timeNow;
914
  pSyncNode->leaderTime = timeNow;
M
Minghao Li 已提交
915 916
  pSyncNode->lastReplicateTime = timeNow;

917 918 919
  // snapshotting
  atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);

S
Shengliang Guan 已提交
920
  sNTrace(pSyncNode, "sync open");
921

M
Minghao Li 已提交
922
  return pSyncNode;
923 924 925

_error:
  if (pSyncInfo->pFsm) {
H
Hongze Cheng 已提交
926 927
    taosMemoryFree(pSyncInfo->pFsm);
    pSyncInfo->pFsm = NULL;
928 929 930 931
  }
  syncNodeClose(pSyncNode);
  pSyncNode = NULL;
  return NULL;
M
Minghao Li 已提交
932 933
}

M
Minghao Li 已提交
934 935 936 937 938 939 940 941 942 943 944
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    SSnapshot snapshot;
    int32_t   code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
    ASSERT(code == 0);
    if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
      pSyncNode->commitIndex = snapshot.lastApplyIndex;
    }
  }
}

M
Minghao Li 已提交
945 946
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
947
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
948
    raftStoreNextTerm(pSyncNode->pRaftStore);
949
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
950

951
    // Raft 3.6.2 Committing entries from previous terms
952 953
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);
M
Minghao Li 已提交
954

M
Minghao Li 已提交
955 956
  } else {
    syncNodeBecomeFollower(pSyncNode, "first start");
957 958
  }

959 960 961
  int32_t ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
962 963
}

M
Minghao Li 已提交
964 965 966 967 968 969 970 971 972
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
973

974 975 976
  ret = 0;
  ret = syncNodeStartPingTimer(pSyncNode);
  ASSERT(ret == 0);
M
Minghao Li 已提交
977 978
}

M
Minghao Li 已提交
979 980 981 982 983 984 985 986
void syncNodePreClose(SSyncNode* pSyncNode) {
  // stop elect timer
  syncNodeStopElectTimer(pSyncNode);

  // stop heartbeat timer
  syncNodeStopHeartbeatTimer(pSyncNode);
}

M
Minghao Li 已提交
987
void syncNodeClose(SSyncNode* pSyncNode) {
988 989 990
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
991 992
  int32_t ret;

S
Shengliang Guan 已提交
993
  sNTrace(pSyncNode, "sync close");
M
Minghao Li 已提交
994

M
Minghao Li 已提交
995
  ret = raftStoreClose(pSyncNode->pRaftStore);
M
Minghao Li 已提交
996
  ASSERT(ret == 0);
M
Minghao Li 已提交
997
  pSyncNode->pRaftStore = NULL;
M
Minghao Li 已提交
998

M
Minghao Li 已提交
999
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
1000
  pSyncNode->pSyncRespMgr = NULL;
M
Minghao Li 已提交
1001
  voteGrantedDestroy(pSyncNode->pVotesGranted);
1002
  pSyncNode->pVotesGranted = NULL;
M
Minghao Li 已提交
1003
  votesRespondDestory(pSyncNode->pVotesRespond);
1004
  pSyncNode->pVotesRespond = NULL;
M
Minghao Li 已提交
1005
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
1006
  pSyncNode->pNextIndex = NULL;
M
Minghao Li 已提交
1007
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
1008
  pSyncNode->pMatchIndex = NULL;
M
Minghao Li 已提交
1009
  logStoreDestory(pSyncNode->pLogStore);
1010
  pSyncNode->pLogStore = NULL;
M
Minghao Li 已提交
1011
  raftCfgClose(pSyncNode->pRaftCfg);
1012
  pSyncNode->pRaftCfg = NULL;
M
Minghao Li 已提交
1013 1014 1015 1016 1017

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1018 1019 1020 1021
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

S
Shengliang Guan 已提交
1022
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1023 1024 1025 1026 1027 1028
    if ((pSyncNode->senders)[i] != NULL) {
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
1029 1030 1031 1032 1033
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

1034
  taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
1035 1036
}

M
Minghao Li 已提交
1037
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
M
Minghao Li 已提交
1038

M
Minghao Li 已提交
1039 1040 1041
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1042 1043
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
1044 1045 1046
                 &pSyncNode->pPingTimer);
    atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1047
    sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
1048
  }
M
Minghao Li 已提交
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
S
Shengliang Guan 已提交
1062
  if (syncIsInit()) {
1063
    pSyncNode->electTimerMS = ms;
M
Minghao Li 已提交
1064 1065 1066 1067 1068 1069

    SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
    pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
    pElectTimer->pSyncNode = pSyncNode;
    pElectTimer->pData = NULL;

S
Shengliang Guan 已提交
1070
    taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager,
1071
                 &pSyncNode->pElectTimer);
1072

1073
  } else {
M
Minghao Li 已提交
1074
    sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
1075
  }
M
Minghao Li 已提交
1076 1077 1078 1079 1080
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1081
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
M
Minghao Li 已提交
1082 1083
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
1084

M
Minghao Li 已提交
1085 1086 1087 1088 1089 1090 1091 1092 1093 1094
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
1095 1096
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
1097 1098 1099 1100 1101 1102 1103
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
1104
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
1105

S
Shengliang Guan 已提交
1106 1107
  sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
          electMS);
M
Minghao Li 已提交
1108 1109 1110
  return ret;
}

M
Minghao Li 已提交
1111
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1112
  int32_t ret = 0;
S
Shengliang Guan 已提交
1113 1114
  if (syncIsInit()) {
    taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
1115 1116 1117
                 &pSyncNode->pHeartbeatTimer);
    atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  } else {
M
Minghao Li 已提交
1118
    sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
1119
  }
1120

S
Shengliang Guan 已提交
1121
  sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1122 1123 1124
  return ret;
}

M
Minghao Li 已提交
1125
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
1126
  int32_t ret = 0;
M
Minghao Li 已提交
1127

1128
#if 0
M
Minghao Li 已提交
1129
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
1130 1131
  ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
1132

S
Shengliang Guan 已提交
1133
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1134
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1135 1136 1137
    if (pSyncTimer != NULL) {
      syncHbTimerStart(pSyncNode, pSyncTimer);
    }
1138
  }
1139

M
Minghao Li 已提交
1140 1141 1142
  return ret;
}

M
Minghao Li 已提交
1143 1144
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
1145 1146

#if 0
M
Minghao Li 已提交
1147 1148 1149
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
1150
#endif
1151

S
Shengliang Guan 已提交
1152
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1153
    SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
M
Minghao Li 已提交
1154 1155 1156
    if (pSyncTimer != NULL) {
      syncHbTimerStop(pSyncNode, pSyncTimer);
    }
1157
  }
1158

M
Minghao Li 已提交
1159 1160 1161
  return ret;
}

1162 1163 1164 1165 1166 1167
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
  return 0;
}

M
Minghao Li 已提交
1168 1169 1170
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1171
  syncUtilRaftId2EpSet(destRaftId, &epSet);
S
Shengliang Guan 已提交
1172
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1173 1174 1175
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1176
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1177
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1178
  } else {
M
Minghao Li 已提交
1179
    sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
S
Shengliang Guan 已提交
1180
    rpcFreeCont(pMsg->pCont);
M
Minghao Li 已提交
1181
    return -1;
M
Minghao Li 已提交
1182
  }
M
Minghao Li 已提交
1183

M
Minghao Li 已提交
1184 1185 1186 1187 1188
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
1189
  syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
S
Shengliang Guan 已提交
1190
  if (pSyncNode->syncSendMSg != NULL) {
M
Minghao Li 已提交
1191 1192 1193
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1194
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1195
    pSyncNode->syncSendMSg(&epSet, pMsg);
M
Minghao Li 已提交
1196
  } else {
M
Minghao Li 已提交
1197
    sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
M
Minghao Li 已提交
1198
  }
M
Minghao Li 已提交
1199 1200 1201
  return 0;
}

1202
inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
1203 1204 1205
  bool b1 = false;
  bool b2 = false;

S
Shengliang Guan 已提交
1206
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1207 1208 1209 1210 1211 1212 1213
    if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
      b1 = true;
      break;
    }
  }

S
Shengliang Guan 已提交
1214
  for (int32_t i = 0; i < config->replicaNum; ++i) {
1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
    raftId.vgId = pSyncNode->vgId;

    if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
      b2 = true;
      break;
    }
  }

  ASSERT(b1 == b2);
  return b1;
}

1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
  if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
  if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
  for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
    const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
    const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
    if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
    if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
  }

  return false;
}

M
Minghao Li 已提交
1242
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
1243
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
1244 1245 1246 1247
  if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
    sInfo("vgId:1, sync not reconfig since not changed");
    return;
  }
S
Shengliang Guan 已提交
1248

1249
  pSyncNode->pRaftCfg->cfg = *pNewConfig;
1250 1251
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

M
Minghao Li 已提交
1252 1253
  bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
  bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
M
Minghao Li 已提交
1254

M
Minghao Li 已提交
1255 1256
  bool isDrop = false;
  bool isAdd = false;
M
Minghao Li 已提交
1257

M
Minghao Li 已提交
1258 1259 1260 1261
  if (IamInOld && !IamInNew) {
    isDrop = true;
  } else {
    isDrop = false;
1262
  }
1263

M
Minghao Li 已提交
1264 1265 1266 1267 1268
  if (!IamInOld && IamInNew) {
    isAdd = true;
  } else {
    isAdd = false;
  }
M
Minghao Li 已提交
1269

M
Minghao Li 已提交
1270
  // log begin config change
S
Shengliang Guan 已提交
1271 1272 1273 1274 1275
  char oldCfgStr[1024] = {0};
  char newCfgStr[1024] = {0};
  syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
  syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
  sNTrace(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
M
Minghao Li 已提交
1276

M
Minghao Li 已提交
1277 1278
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
M
Minghao Li 已提交
1279
  }
M
Minghao Li 已提交
1280 1281
  if (isDrop) {
    pSyncNode->pRaftCfg->isStandBy = 1;  // set standby
M
Minghao Li 已提交
1282 1283
  }

M
Minghao Li 已提交
1284
  // add last config index
M
Minghao Li 已提交
1285
  raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
M
Minghao Li 已提交
1286

M
Minghao Li 已提交
1287 1288 1289 1290 1291 1292 1293 1294 1295
  if (IamInNew) {
    //-----------------------------------------
    int32_t ret = 0;

    // save snapshot senders
    int32_t oldReplicaNum = pSyncNode->replicaNum;
    SRaftId oldReplicasId[TSDB_MAX_REPLICA];
    memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
    SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1296
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1297
      oldSenders[i] = (pSyncNode->senders)[i];
S
Shengliang Guan 已提交
1298
      sSTrace(oldSenders[i], "snapshot sender save old");
M
Minghao Li 已提交
1299
    }
1300

M
Minghao Li 已提交
1301 1302
    // init internal
    pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
1303
    syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1304 1305 1306

    // init peersNum, peers, peersId
    pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
S
Shengliang Guan 已提交
1307 1308
    int32_t j = 0;
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
1309 1310 1311 1312 1313
      if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
        pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
        j++;
      }
    }
S
Shengliang Guan 已提交
1314
    for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
1315
      syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
1316
    }
1317

M
Minghao Li 已提交
1318 1319
    // init replicaNum, replicasId
    pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
S
Shengliang Guan 已提交
1320
    for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
1321
      syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
1322
    }
1323

1324 1325 1326
    // update quorum first
    pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

M
Minghao Li 已提交
1327 1328 1329 1330
    syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
    syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
    voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
    votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1331

M
Minghao Li 已提交
1332
    // reset snapshot senders
1333

M
Minghao Li 已提交
1334
    // clear new
S
Shengliang Guan 已提交
1335
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1336 1337
      (pSyncNode->senders)[i] = NULL;
    }
M
Minghao Li 已提交
1338

M
Minghao Li 已提交
1339
    // reset new
S
Shengliang Guan 已提交
1340
    for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
M
Minghao Li 已提交
1341 1342
      // reset sender
      bool reset = false;
S
Shengliang Guan 已提交
1343
      for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
M
Minghao Li 已提交
1344 1345 1346 1347
        if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
          char     host[128];
          uint16_t port;
          syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1348
          sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
S
Shengliang Guan 已提交
1349
                  (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
M
Minghao Li 已提交
1350 1351 1352 1353 1354 1355 1356 1357 1358

          (pSyncNode->senders)[i] = oldSenders[j];
          oldSenders[j] = NULL;
          reset = true;

          // reset replicaIndex
          int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
          (pSyncNode->senders)[i]->replicaIndex = i;

S
Shengliang Guan 已提交
1359 1360
          sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
                  i, host, port, (pSyncNode->senders)[i], reset);
M
Minghao Li 已提交
1361
        }
1362 1363
      }
    }
1364

M
Minghao Li 已提交
1365
    // create new
S
Shengliang Guan 已提交
1366
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1367 1368
      if ((pSyncNode->senders)[i] == NULL) {
        (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
S
Shengliang Guan 已提交
1369
        sSTrace((pSyncNode->senders)[i], "snapshot sender create new");
M
Minghao Li 已提交
1370
      }
1371 1372
    }

M
Minghao Li 已提交
1373
    // free old
S
Shengliang Guan 已提交
1374
    for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1375 1376
      if (oldSenders[i] != NULL) {
        snapshotSenderDestroy(oldSenders[i]);
S
Shengliang Guan 已提交
1377
        sNTrace(pSyncNode, "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
M
Minghao Li 已提交
1378 1379
        oldSenders[i] = NULL;
      }
1380 1381
    }

1382
    // persist cfg
M
Minghao Li 已提交
1383
    raftCfgPersist(pSyncNode->pRaftCfg);
1384

S
Shengliang Guan 已提交
1385
    char tmpbuf[1024] = {0};
1386
    snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%" PRId64 ", %s  -->  %s",
S
Shengliang Guan 已提交
1387
             oldConfig.replicaNum, pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1388

M
Minghao Li 已提交
1389 1390 1391
    // change isStandBy to normal (election timeout)
    if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
      syncNodeBecomeLeader(pSyncNode, tmpbuf);
1392 1393 1394 1395 1396

      // Raft 3.6.2 Committing entries from previous terms
      syncNodeAppendNoop(pSyncNode);
      syncMaybeAdvanceCommitIndex(pSyncNode);

M
Minghao Li 已提交
1397 1398 1399 1400
    } else {
      syncNodeBecomeFollower(pSyncNode, tmpbuf);
    }
  } else {
1401
    // persist cfg
M
Minghao Li 已提交
1402
    raftCfgPersist(pSyncNode->pRaftCfg);
S
Shengliang Guan 已提交
1403 1404
    sNTrace(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s  -->  %s", oldConfig.replicaNum,
            pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
1405
  }
1406

M
Minghao Li 已提交
1407
_END:
M
Minghao Li 已提交
1408
  // log end config change
S
Shengliang Guan 已提交
1409
  sNTrace(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
M
Minghao Li 已提交
1410 1411
}

M
Minghao Li 已提交
1412 1413 1414 1415
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1416
    char tmpBuf[64];
1417
    snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
1418
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
M
Minghao Li 已提交
1419 1420 1421 1422
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1423 1424 1425 1426 1427 1428
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
  }
}

M
Minghao Li 已提交
1429
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
M
Minghao Li 已提交
1430
  if (pSyncNode->pRaftStore->currentTerm > newTerm) {
1431
    sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1432
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1433 1434
    return;
  }
M
Minghao Li 已提交
1435 1436

  do {
1437
    sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm,
S
Shengliang Guan 已提交
1438
            pSyncNode->pRaftStore->currentTerm);
M
Minghao Li 已提交
1439 1440 1441 1442 1443
  } while (0);

  if (pSyncNode->pRaftStore->currentTerm < newTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
    char tmpBuf[64];
1444
    snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
M
Minghao Li 已提交
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454
    syncNodeBecomeFollower(pSyncNode, tmpBuf);
    raftStoreClearVote(pSyncNode->pRaftStore);

  } else {
    if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
      syncNodeBecomeFollower(pSyncNode, "step down");
    }
  }
}

1455 1456
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }

1457
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
M
Minghao Li 已提交
1458
  // maybe clear leader cache
M
Minghao Li 已提交
1459 1460 1461 1462
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
1463
  // state change
M
Minghao Li 已提交
1464 1465 1466
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1467 1468
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1469

1470 1471 1472
  // send rsp to client
  syncNodeLeaderChangeRsp(pSyncNode);

1473 1474 1475 1476 1477
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
    pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1478 1479 1480
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1481
  // trace log
S
Shengliang Guan 已提交
1482
  sNTrace(pSyncNode, "become follower %s", debugStr);
M
Minghao Li 已提交
1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1503
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1504 1505
  pSyncNode->leaderTime = taosGetTimestampMs();

1506 1507 1508
  // reset restoreFinish
  pSyncNode->restoreFinish = false;

M
Minghao Li 已提交
1509
  // state change
M
Minghao Li 已提交
1510
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1511 1512

  // set leader cache
M
Minghao Li 已提交
1513 1514
  pSyncNode->leaderCache = pSyncNode->myRaftId;

S
Shengliang Guan 已提交
1515
  for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1516 1517
    // maybe overwrite myself, no harm
    // just do it!
1518 1519 1520 1521 1522 1523 1524 1525 1526

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1527 1528
  }

S
Shengliang Guan 已提交
1529
  for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1530 1531
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1532 1533 1534
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1535 1536 1537
  // init peer mgr
  syncNodePeerStateInit(pSyncNode);

M
Minghao Li 已提交
1538
#if 0
1539 1540
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1541
  if (pMySender != NULL) {
S
Shengliang Guan 已提交
1542
    for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
1543 1544 1545
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1546
    }
1547
    (pMySender->privateTerm) += 100;
1548
  }
M
Minghao Li 已提交
1549
#endif
1550

1551 1552 1553 1554 1555
  // close receiver
  if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
    snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
  }

M
Minghao Li 已提交
1556
  // stop elect timer
M
Minghao Li 已提交
1557
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1558

M
Minghao Li 已提交
1559 1560
  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1561

M
Minghao Li 已提交
1562 1563
  // send heartbeat right now
  syncNodeHeartbeatPeers(pSyncNode);
M
Minghao Li 已提交
1564

1565 1566 1567 1568 1569
  // call back
  if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
    pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
1570 1571 1572
  // min match index
  pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;

M
Minghao Li 已提交
1573
  // trace log
S
Shengliang Guan 已提交
1574
  sNTrace(pSyncNode, "become leader %s", debugStr);
M
Minghao Li 已提交
1575 1576 1577
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1578 1579
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
1580
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1581

S
Shengliang Guan 已提交
1582
  sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
M
Minghao Li 已提交
1583

M
Minghao Li 已提交
1584
  // Raft 3.6.2 Committing entries from previous terms
1585 1586
  syncNodeAppendNoop(pSyncNode);
  syncMaybeAdvanceCommitIndex(pSyncNode);
1587 1588

  if (pSyncNode->replicaNum > 1) {
M
Minghao Li 已提交
1589
    syncNodeReplicate(pSyncNode);
1590
  }
M
Minghao Li 已提交
1591 1592
}

M
Minghao Li 已提交
1593 1594
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }

M
Minghao Li 已提交
1595
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1596
  for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
M
Minghao Li 已提交
1597 1598 1599 1600 1601
    pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID;
    pSyncNode->peerStates[i].lastSendTime = 0;
  }

  return 0;
M
Minghao Li 已提交
1602 1603 1604
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1605
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
M
Minghao Li 已提交
1606
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
S
Shengliang Guan 已提交
1607
  sNTrace(pSyncNode, "follower to candidate");
M
Minghao Li 已提交
1608 1609 1610
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1611
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1612
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
S
Shengliang Guan 已提交
1613
  sNTrace(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1614 1615 1616
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1617
  ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1618
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
S
Shengliang Guan 已提交
1619
  sNTrace(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1620 1621 1622
}

// raft vote --------------
M
Minghao Li 已提交
1623 1624 1625

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1626
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
M
Minghao Li 已提交
1627 1628
  ASSERT(term == pSyncNode->pRaftStore->currentTerm);
  ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1629 1630 1631 1632

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1633
// simulate get vote from outside
M
Minghao Li 已提交
1634
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
S
Shengliang Guan 已提交
1635
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
M
Minghao Li 已提交
1636

S
Shengliang Guan 已提交
1637 1638
  SRpcMsg rpcMsg = {0};
  int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
S
Shengliang Guan 已提交
1639
  if (ret != 0) return;
S
Shengliang Guan 已提交
1640 1641

  SyncRequestVoteReply* pMsg = rpcMsg.pCont;
M
Minghao Li 已提交
1642 1643 1644 1645 1646 1647 1648
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
S
Shengliang Guan 已提交
1649
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1650 1651
}

M
Minghao Li 已提交
1652
// return if has a snapshot
M
Minghao Li 已提交
1653 1654
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
1655
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1656 1657
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1658 1659 1660 1661 1662 1663 1664
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

M
Minghao Li 已提交
1665 1666
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
1667
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1668
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1669 1670
  if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
    pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1671 1672 1673 1674 1675 1676 1677
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

M
Minghao Li 已提交
1678 1679
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
M
Minghao Li 已提交
1680 1681
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1682 1683
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
1684
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
1685 1686
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1687 1688
    }

M
Minghao Li 已提交
1689 1690 1691
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1692 1693 1694 1695
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1696
  } else {
M
Minghao Li 已提交
1697 1698
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1699
  }
M
Minghao Li 已提交
1700

M
Minghao Li 已提交
1701 1702 1703 1704 1705 1706 1707
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1708 1709
  return 0;
}
M
Minghao Li 已提交
1710

M
Minghao Li 已提交
1711
// return append-entries first try index
M
Minghao Li 已提交
1712 1713 1714 1715 1716
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1717 1718
// if index > 0, return index - 1
// else, return -1
1719 1720 1721 1722 1723 1724 1725 1726 1727
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
  SyncIndex preIndex = index - 1;
  if (preIndex < SYNC_INDEX_INVALID) {
    preIndex = SYNC_INDEX_INVALID;
  }

  return preIndex;
}

M
Minghao Li 已提交
1728 1729 1730 1731
// if index < 0, return SYNC_TERM_INVALID
// if index == 0, return 0
// if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  if (index < SYNC_INDEX_BEGIN) {
    return SYNC_TERM_INVALID;
  }

  if (index == SYNC_INDEX_BEGIN) {
    return 0;
  }

  SyncTerm        preTerm = 0;
  SyncIndex       preIndex = index - 1;
  SSyncRaftEntry* pPreEntry = NULL;
  int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
M
Minghao Li 已提交
1745 1746 1747 1748 1749 1750

  SSnapshot snapshot = {.data = NULL,
                        .lastApplyIndex = SYNC_INDEX_INVALID,
                        .lastApplyTerm = SYNC_TERM_INVALID,
                        .lastConfigIndex = SYNC_INDEX_INVALID};

1751 1752 1753 1754 1755 1756
  if (code == 0) {
    ASSERT(pPreEntry != NULL);
    preTerm = pPreEntry->term;
    taosMemoryFree(pPreEntry);
    return preTerm;
  } else {
1757 1758 1759 1760
    if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
      pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
      if (snapshot.lastApplyIndex == preIndex) {
        return snapshot.lastApplyTerm;
1761 1762 1763 1764
      }
    }
  }

1765
  sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64,
S
Shengliang Guan 已提交
1766
          index, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
1767 1768
  return SYNC_TERM_INVALID;
}
M
Minghao Li 已提交
1769 1770 1771 1772

// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1773
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1774 1775 1776
  return 0;
}

M
Minghao Li 已提交
1777
static void syncNodeEqPingTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1778 1779 1780 1781 1782
  if (!syncIsInit()) return;

  SSyncNode* pNode = param;
  if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
    SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1783
    int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
S
Shengliang Guan 已提交
1784 1785 1786 1787 1788
                                    pNode->pingTimerMS, pNode);
    if (code != 0) {
      sNError(pNode, "failed to build ping msg");
      rpcFreeCont(rpcMsg.pCont);
      return;
M
Minghao Li 已提交
1789
    }
M
Minghao Li 已提交
1790

S
Shengliang Guan 已提交
1791 1792 1793 1794 1795 1796
    sNTrace(pNode, "enqueue ping msg");
    code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
    if (code != 0) {
      sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr());
      rpcFreeCont(rpcMsg.pCont);
      return;
1797 1798
    }

S
Shengliang Guan 已提交
1799
    taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
M
Minghao Li 已提交
1800
  } else {
1801
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64,
S
Shengliang Guan 已提交
1802
           pNode->pingTimerLogicClock, pNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1803 1804 1805 1806
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1807 1808 1809 1810 1811 1812
  if (!syncIsInit()) return;

  SElectTimer* pElectTimer = param;
  SSyncNode*   pNode = pElectTimer->pSyncNode;

  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1813
  int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
S
Shengliang Guan 已提交
1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829

  if (code != 0) {
    sNError(pNode, "failed to build elect msg");
    taosMemoryFree(pElectTimer);
    return;
  }

  SyncTimeout* pTimeout = rpcMsg.pCont;
  sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);

  code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
  if (code != 0) {
    sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr());
    rpcFreeCont(rpcMsg.pCont);
    taosMemoryFree(pElectTimer);
    return;
M
Minghao Li 已提交
1830
  }
M
Minghao Li 已提交
1831 1832

  taosMemoryFree(pElectTimer);
M
Minghao Li 已提交
1833

M
Minghao Li 已提交
1834
#if 0
M
Minghao Li 已提交
1835
  // reset timer ms
S
Shengliang Guan 已提交
1836 1837 1838
  if (syncIsInit() && pNode->electBaseLine > 0) {
    pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine);
    taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer);
M
Minghao Li 已提交
1839 1840
  } else {
    sError("sync env is stop, syncNodeEqElectTimer");
M
Minghao Li 已提交
1841
  }
M
Minghao Li 已提交
1842
#endif
M
Minghao Li 已提交
1843 1844
}

M
Minghao Li 已提交
1845
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
S
Shengliang Guan 已提交
1846
  if (!syncIsInit()) return;
1847

S
Shengliang Guan 已提交
1848 1849 1850 1851
  SSyncNode* pNode = param;
  if (pNode->replicaNum > 1) {
    if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1852
      int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
S
Shengliang Guan 已提交
1853 1854 1855 1856 1857
                                      pNode->heartbeatTimerMS, pNode);

      if (code != 0) {
        sNError(pNode, "failed to build heartbeat msg");
        return;
1858
      }
M
Minghao Li 已提交
1859

S
Shengliang Guan 已提交
1860 1861 1862 1863 1864 1865
      sNTrace(pNode, "enqueue heartbeat timer");
      code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
      if (code != 0) {
        sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr());
        rpcFreeCont(rpcMsg.pCont);
        return;
1866
      }
S
Shengliang Guan 已提交
1867 1868 1869 1870

      taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
                   &pNode->pHeartbeatTimer);

1871
    } else {
S
Shengliang Guan 已提交
1872 1873
      sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
             pNode->heartbeatTimerLogicClock, pNode->heartbeatTimerLogicClockUser);
1874
    }
M
Minghao Li 已提交
1875 1876 1877
  }
}

1878 1879 1880 1881 1882
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
  SSyncHbTimerData* pData = (SSyncHbTimerData*)param;
  SSyncNode*        pSyncNode = pData->pSyncNode;
  SSyncTimer*       pSyncTimer = pData->pTimer;

M
Minghao Li 已提交
1883 1884 1885 1886
  if (pSyncNode == NULL) {
    return;
  }

M
Minghao Li 已提交
1887 1888 1889 1890
  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    return;
  }

M
Minghao Li 已提交
1891 1892 1893 1894
  if (pSyncNode->pRaftStore == NULL) {
    return;
  }

S
Shengliang Guan 已提交
1895
  // sNTrace(pSyncNode, "eq peer hb timer");
1896 1897 1898 1899 1900 1901

  int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
  int64_t msgLogicClock = atomic_load_64(&pData->logicClock);

  if (pSyncNode->replicaNum > 1) {
    if (timerLogicClock == msgLogicClock) {
1902
      SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1903 1904 1905
      (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);

      SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
1906 1907 1908 1909
      pSyncMsg->srcId = pSyncNode->myRaftId;
      pSyncMsg->destId = pData->destId;
      pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
      pSyncMsg->commitIndex = pSyncNode->commitIndex;
M
Minghao Li 已提交
1910
      pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
1911 1912 1913
      pSyncMsg->privateTerm = 0;

      // send msg
S
Shengliang Guan 已提交
1914
      syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
1915

S
Shengliang Guan 已提交
1916 1917
      if (syncIsInit()) {
        taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
1918 1919 1920 1921 1922 1923
                     &pSyncTimer->pTimer);
      } else {
        sError("sync env is stop, syncNodeEqHeartbeatTimer");
      }

    } else {
1924
      sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", timerLogicClock,
1925 1926 1927 1928 1929
             msgLogicClock);
    }
  }
}

1930 1931 1932 1933 1934
static int32_t syncNodeEqNoop(SSyncNode* pNode) {
  if (pNode->state == TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return -1;
  }
M
Minghao Li 已提交
1935

1936 1937 1938 1939
  SyncIndex       index = pNode->pLogStore->syncLogWriteIndex(pNode->pLogStore);
  SyncTerm        term = pNode->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, pNode->vgId);
  if (pEntry == NULL) return -1;
M
Minghao Li 已提交
1940

S
Shengliang Guan 已提交
1941
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
1942
  int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
M
Minghao Li 已提交
1943
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1944

1945 1946 1947 1948 1949 1950 1951
  sNTrace(pNode, "propose msg, type:noop");
  code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
  if (code != 0) {
    sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr());
  }

  return code;
M
Minghao Li 已提交
1952 1953
}

1954 1955 1956
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
S
Shengliang Guan 已提交
1957 1958
  int32_t   code = 0;
  int32_t   entryLen = sizeof(*pEntry) + pEntry->dataLen;
1959 1960 1961 1962 1963 1964 1965 1966 1967
  LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
                                        deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
  if (status != TAOS_LRU_STATUS_OK) {
    code = -1;
  }

  return code;
}

M
Minghao Li 已提交
1968 1969 1970
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

1971
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
1972
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1973
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1974
  ASSERT(pEntry != NULL);
M
Minghao Li 已提交
1975

1976 1977 1978
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
1979
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
1980
    int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
1981
    if (code != 0) {
S
Shengliang Guan 已提交
1982
      sNError(ths, "append noop error");
1983 1984
      return -1;
    }
M
Minghao Li 已提交
1985 1986
  }

1987 1988 1989 1990 1991 1992
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
1993 1994 1995
  return ret;
}

S
Shengliang Guan 已提交
1996 1997
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeat* pMsg = pRpcMsg->pCont;
1998 1999
  syncLogRecvHeartbeat(ths, pMsg, "");

2000 2001 2002 2003
  SRpcMsg rpcMsg = {0};
  (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);

  SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
2004 2005 2006 2007 2008
  pMsgReply->destId = pMsg->srcId;
  pMsgReply->srcId = ths->myRaftId;
  pMsgReply->term = ths->pRaftStore->currentTerm;
  pMsgReply->privateTerm = 8864;  // magic number

M
Minghao Li 已提交
2009
  if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
2010
    syncNodeResetElectTimer(ths);
M
Minghao Li 已提交
2011
    ths->minMatchIndex = pMsg->minMatchIndex;
2012 2013

    if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
2014
      // syncNodeFollowerCommit(ths, pMsg->commitIndex);
S
Shengliang Guan 已提交
2015 2016 2017 2018
      SRpcMsg rpcMsgLocalCmd = {0};
      (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

      SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2019 2020 2021 2022 2023 2024 2025 2026 2027
      pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
      pSyncMsg->fcIndex = pMsg->commitIndex;

      if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
        int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
        if (code != 0) {
          sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
          rpcFreeCont(rpcMsgLocalCmd.pCont);
        } else {
2028
          sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, pSyncMsg->fcIndex);
2029 2030
        }
      }
2031 2032 2033
    }
  }

M
Minghao Li 已提交
2034
  if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
2035
    // syncNodeStepDown(ths, pMsg->term);
S
Shengliang Guan 已提交
2036 2037 2038 2039
    SRpcMsg rpcMsgLocalCmd = {0};
    (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);

    SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
2040 2041 2042
    pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
    pSyncMsg->sdNewTerm = pMsg->term;

S
Shengliang Guan 已提交
2043 2044
    if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
      int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
2045 2046 2047 2048
      if (code != 0) {
        sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
        rpcFreeCont(rpcMsgLocalCmd.pCont);
      } else {
2049
        sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
2050 2051
      }
    }
M
Minghao Li 已提交
2052 2053
  }

2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065
  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

  // reply
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
  return 0;
}

2066 2067
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
2068 2069 2070
  syncLogRecvHeartbeatReply(ths, pMsg, "");

  // update last reply time, make decision whether the other node is alive or not
2071
  syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->destId, pMsg->startTime);
2072 2073 2074
  return 0;
}

S
Shengliang Guan 已提交
2075 2076
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
  SyncLocalCmd* pMsg = pRpcMsg->pCont;
2077 2078
  syncLogRecvLocalCmd(ths, pMsg, "");

M
Minghao Li 已提交
2079 2080 2081
  if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
    syncNodeStepDown(ths, pMsg->sdNewTerm);

2082 2083 2084
  } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
    syncNodeFollowerCommit(ths, pMsg->fcIndex);

M
Minghao Li 已提交
2085
  } else {
S
Shengliang Guan 已提交
2086
    sNError(ths, "error local cmd");
M
Minghao Li 已提交
2087 2088 2089 2090 2091
  }

  return 0;
}

M
Minghao Li 已提交
2092 2093 2094 2095 2096 2097 2098 2099 2100 2101
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
2102

2103
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
S
Shengliang Guan 已提交
2104
  sNTrace(ths, "on client request");
2105

M
Minghao Li 已提交
2106
  int32_t ret = 0;
2107
  int32_t code = 0;
M
Minghao Li 已提交
2108

M
Minghao Li 已提交
2109
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
2110
  SyncTerm        term = ths->pRaftStore->currentTerm;
2111 2112 2113 2114 2115 2116 2117
  SSyncRaftEntry* pEntry;

  if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
  } else {
    pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
  }
M
Minghao Li 已提交
2118

2119 2120 2121
  LRUHandle* h = NULL;
  syncCacheEntry(ths->pLogStore, pEntry, &h);

M
Minghao Li 已提交
2122
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
2123 2124 2125
    // append entry
    code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
    if (code != 0) {
2126 2127 2128 2129 2130 2131
      if (ths->replicaNum == 1) {
        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2132

2133 2134 2135 2136
        return -1;

      } else {
        // del resp mgr, call FpCommitCb
2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147
        SFsmCbMeta cbMeta = {
            .index = pEntry->index,
            .lastConfigIndex = SYNC_INDEX_INVALID,
            .isWeak = pEntry->isWeak,
            .code = -1,
            .state = ths->state,
            .seqNum = pEntry->seqNum,
            .term = pEntry->term,
            .currentTerm = ths->pRaftStore->currentTerm,
            .flag = 0,
        };
2148
        ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
2149 2150 2151 2152 2153 2154 2155

        if (h) {
          taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }

2156 2157
        return -1;
      }
2158
    }
M
Minghao Li 已提交
2159

2160 2161
    // if mulit replica, start replicate right now
    if (ths->replicaNum > 1) {
M
Minghao Li 已提交
2162
      syncNodeReplicate(ths);
2163
    }
2164

2165 2166
    // if only myself, maybe commit right now
    if (ths->replicaNum == 1) {
2167 2168 2169 2170 2171
      if (syncNodeIsMnode(ths)) {
        syncMaybeAdvanceCommitIndex(ths);
      } else {
        syncOneReplicaAdvance(ths);
      }
2172
    }
M
Minghao Li 已提交
2173 2174
  }

2175 2176 2177 2178 2179 2180 2181 2182
  if (pRetIndex != NULL) {
    if (ret == 0 && pEntry != NULL) {
      *pRetIndex = pEntry->index;
    } else {
      *pRetIndex = SYNC_INDEX_INVALID;
    }
  }

2183 2184 2185 2186 2187 2188
  if (h) {
    taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
  } else {
    syncEntryDestory(pEntry);
  }

M
Minghao Li 已提交
2189
  return ret;
2190
}
M
Minghao Li 已提交
2191

S
Shengliang Guan 已提交
2192 2193 2194
const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
2195
      return "follower";
S
Shengliang Guan 已提交
2196
    case TAOS_SYNC_STATE_CANDIDATE:
2197
      return "candidate";
S
Shengliang Guan 已提交
2198
    case TAOS_SYNC_STATE_LEADER:
2199
      return "leader";
S
Shengliang Guan 已提交
2200
    default:
2201
      return "error";
S
Shengliang Guan 已提交
2202
  }
M
Minghao Li 已提交
2203
}
2204

2205
#if 0
2206
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
2207
  if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
S
Shengliang Guan 已提交
2208
    sNTrace(ths, "I am not follower, can not do leader transfer");
2209 2210
    return 0;
  }
2211 2212

  if (!ths->restoreFinish) {
S
Shengliang Guan 已提交
2213
    sNTrace(ths, "restore not finish, can not do leader transfer");
2214 2215 2216
    return 0;
  }

2217
  if (pEntry->term < ths->pRaftStore->currentTerm) {
2218
    sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term);
2219 2220 2221 2222
    return 0;
  }

  if (pEntry->index < syncNodeGetLastIndex(ths)) {
S
Shengliang Guan 已提交
2223
    sNTrace(ths, "little index:%" PRId64 ", can not do leader transfer", pEntry->index);
2224 2225 2226
    return 0;
  }

2227 2228
  /*
    if (ths->vgId > 1) {
S
Shengliang Guan 已提交
2229
      sNTrace(ths, "I am vnode, can not do leader transfer");
2230 2231 2232 2233
      return 0;
    }
  */

2234
  SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont;
S
Shengliang Guan 已提交
2235
  sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index);
M
Minghao Li 已提交
2236

M
Minghao Li 已提交
2237 2238 2239
  bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
  bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
                      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
M
Minghao Li 已提交
2240

M
Minghao Li 已提交
2241 2242
  bool same = sameId || sameNodeInfo;
  if (same) {
M
Minghao Li 已提交
2243 2244 2245 2246
    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
M
Minghao Li 已提交
2247

2248
    sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
S
Shengliang Guan 已提交
2249
            pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
2250 2251
  }

M
Minghao Li 已提交
2252
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
S
Shengliang Guan 已提交
2253
    SFsmCbMeta cbMeta = {
S
Shengliang Guan 已提交
2254 2255 2256 2257 2258 2259 2260 2261 2262
        .code = 0,
        .currentTerm = ths->pRaftStore->currentTerm,
        .flag = 0,
        .index = pEntry->index,
        .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
        .isWeak = pEntry->isWeak,
        .seqNum = pEntry->seqNum,
        .state = ths->state,
        .term = pEntry->term,
S
Shengliang Guan 已提交
2263 2264
    };
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
2265 2266
  }

2267 2268 2269
  return 0;
}

2270 2271
#endif

2272
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
S
Shengliang Guan 已提交
2273
  for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) {
2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286
    SRaftId raftId;
    raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
    raftId.vgId = ths->vgId;

    if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
      pNewCfg->myIndex = i;
      return 0;
    }
  }

  return -1;
}

2287 2288 2289 2290
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
  return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}

M
Minghao Li 已提交
2291
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2292 2293 2294 2295
  if (beginIndex > endIndex) {
    return 0;
  }

M
Minghao Li 已提交
2296 2297 2298 2299 2300 2301 2302 2303 2304
  if (ths == NULL) {
    return -1;
  }

  if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
    // advance commit index to sanpshot first
    SSnapshot snapshot = {0};
    ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
S
Shengliang Guan 已提交
2305
      sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
2306

M
Minghao Li 已提交
2307 2308 2309
      // update begin index
      beginIndex = snapshot.lastApplyIndex + 1;
    }
2310 2311
  }

2312 2313
  int32_t    code = 0;
  ESyncState state = flag;
M
Minghao Li 已提交
2314

S
Shengliang Guan 已提交
2315
  sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
2316 2317 2318 2319 2320 2321

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
2322 2323 2324 2325 2326 2327
        SLRUCache*      pCache = ths->pLogStore->pCache;
        LRUHandle*      h = taosLRUCacheLookup(pCache, &i, sizeof(i));
        if (h) {
          pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
        } else {
          code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
M
Minghao Li 已提交
2328 2329 2330
          // ASSERT(code == 0);
          // ASSERT(pEntry != NULL);
          if (code != 0 || pEntry == NULL) {
S
Shengliang Guan 已提交
2331
            sNError(ths, "get log entry error");
2332
            sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
M
Minghao Li 已提交
2333 2334
            continue;
          }
2335
        }
2336

2337
        SRpcMsg rpcMsg = {0};
2338 2339
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2340
        // user commit
2341 2342
        if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
          bool internalExecute = true;
S
Shengliang Guan 已提交
2343
          if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
2344 2345 2346
            internalExecute = false;
          }

S
Shengliang Guan 已提交
2347
          sNTrace(ths, "commit index:%" PRId64 ", internal:%d", i, internalExecute);
2348

2349 2350
          // execute fsm in apply thread, or execute outside syncPropose
          if (internalExecute) {
S
Shengliang Guan 已提交
2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362
            SFsmCbMeta cbMeta = {
                .index = pEntry->index,
                .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
                .isWeak = pEntry->isWeak,
                .code = 0,
                .state = ths->state,
                .seqNum = pEntry->seqNum,
                .term = pEntry->term,
                .currentTerm = ths->pRaftStore->currentTerm,
                .flag = flag,
            };

S
Shengliang Guan 已提交
2363
            syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
S
Shengliang Guan 已提交
2364
            ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
2365
          }
2366 2367
        }

2368 2369
#if 0
        // execute in pre-commit
M
Minghao Li 已提交
2370
        // leader transfer
2371 2372 2373
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
2374
        }
2375
#endif
2376 2377

        // restore finish
2378
        // if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
2379 2380 2381 2382 2383 2384
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
M
Minghao Li 已提交
2385

2386
            int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
S
Shengliang Guan 已提交
2387
            sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
2388 2389 2390 2391
          }
        }

        rpcFreeCont(rpcMsg.pCont);
2392 2393 2394 2395 2396
        if (h) {
          taosLRUCacheRelease(pCache, h, false);
        } else {
          syncEntryDestory(pEntry);
        }
2397 2398 2399 2400
      }
    }
  }
  return 0;
2401 2402 2403
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
S
Shengliang Guan 已提交
2404
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2405 2406 2407 2408 2409
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2410 2411 2412 2413
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
S
Shengliang Guan 已提交
2414
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2415 2416 2417 2418 2419
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2420
}
M
Minghao Li 已提交
2421

2422 2423
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) {
  SSyncTimer* pTimer = NULL;
S
Shengliang Guan 已提交
2424
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
2425 2426 2427 2428 2429 2430 2431
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pTimer = &((ths->peerHeartbeatTimerArr)[i]);
    }
  }
  return pTimer;
}

M
Minghao Li 已提交
2432 2433
SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
  SPeerState* pState = NULL;
S
Shengliang Guan 已提交
2434
  for (int32_t i = 0; i < ths->replicaNum; ++i) {
M
Minghao Li 已提交
2435 2436 2437 2438 2439 2440 2441 2442 2443
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pState = &((ths->peerStates)[i]);
    }
  }
  return pState;
}

bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
  SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
M
Minghao Li 已提交
2444
  if (pState == NULL) {
2445
    sError("vgId:%d, replica maybe dropped", ths->vgId);
M
Minghao Li 已提交
2446 2447
    return false;
  }
M
Minghao Li 已提交
2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458

  SyncIndex sendIndex = pMsg->prevLogIndex + 1;
  int64_t   tsNow = taosGetTimestampMs();

  if (pState->lastSendIndex == sendIndex && tsNow - pState->lastSendTime < SYNC_APPEND_ENTRIES_TIMEOUT_MS) {
    return false;
  }

  return true;
}

M
Minghao Li 已提交
2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472
bool syncNodeCanChange(SSyncNode* pSyncNode) {
  if (pSyncNode->changing) {
    sError("sync cannot change");
    return false;
  }

  if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
    SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
    if (pSyncNode->commitIndex != lastIndex) {
      sError("sync cannot change2");
      return false;
    }
  }

S
Shengliang Guan 已提交
2473
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
2474
    SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
M
Minghao Li 已提交
2475
    if (pSender != NULL && pSender->start) {
M
Minghao Li 已提交
2476 2477 2478 2479 2480 2481
      sError("sync cannot change3");
      return false;
    }
  }

  return true;
M
Minghao Li 已提交
2482
}