syncMain.c 14.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18
#include "syncEnv.h"
M
Minghao Li 已提交
19
#include "syncInt.h"
M
Minghao Li 已提交
20
#include "syncRaft.h"
M
Minghao Li 已提交
21
#include "syncUtil.h"
M
Minghao Li 已提交
22

M
Minghao Li 已提交
23 24 25
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
26 27 28
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);

M
Minghao Li 已提交
29 30 31 32
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);

M
Minghao Li 已提交
33 34 35
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
M
Minghao Li 已提交
36
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
M
Minghao Li 已提交
37 38 39 40 41 42 43

static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
static void syncNodeLeader2Follower(SSyncNode* pSyncNode);
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
M
Minghao Li 已提交
44 45 46

void syncNodeRequestVotePeers(SSyncNode* pSyncNode);
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
M
Minghao Li 已提交
47 48 49 50 51 52
// ---------------------------------

int32_t syncInit() {
  sTrace("syncInit ok");
  return 0;
}
M
Minghao Li 已提交
53

M
Minghao Li 已提交
54
void syncCleanUp() { sTrace("syncCleanUp ok"); }
M
Minghao Li 已提交
55

M
Minghao Li 已提交
56 57 58 59 60
int64_t syncStart(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
  return 0;
}
M
Minghao Li 已提交
61 62 63 64 65

void syncStop(int64_t rid) {}

int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }

M
Minghao Li 已提交
66
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
M
Minghao Li 已提交
67 68 69

ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }

M
Minghao Li 已提交
70 71 72 73 74
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}

SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
75
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
76

M
Minghao Li 已提交
77 78 79 80 81 82 83
  pSyncNode->vgId = pSyncInfo->vgId;
  pSyncNode->syncCfg = pSyncInfo->syncCfg;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  pSyncNode->pFsm = pSyncInfo->pFsm;

  pSyncNode->rpcClient = pSyncInfo->rpcClient;
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
84 85
  pSyncNode->queue = pSyncInfo->queue;
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
86 87 88 89 90 91 92 93 94 95 96 97

  pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
  pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;

  int j = 0;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    if (i != pSyncInfo->syncCfg.myIndex) {
      pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i];
      j++;
    }
  }

M
syncInt  
Minghao Li 已提交
98
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
99 100
  syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);

M
Minghao Li 已提交
101
  // init ping timer
M
Minghao Li 已提交
102
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
103 104 105
  pSyncNode->pingTimerMS = PING_TIMER_MS;
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
106
  pSyncNode->FpPingTimer = syncNodeEqPingTimer;
M
Minghao Li 已提交
107
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
108

M
Minghao Li 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
  // init elect timer
  pSyncNode->pElectTimer = NULL;
  pSyncNode->electTimerMS = syncUtilElectRandomMS();
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
  pSyncNode->FpElectTimer = syncNodeEqElectTimer;
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
  pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS;
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
  pSyncNode->FpHeartbeatTimer = syncNodeEqHeartbeatTimer;
  pSyncNode->heartbeatTimerCounter = 0;

M
Minghao Li 已提交
125 126 127 128 129 130
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
131
  pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
M
Minghao Li 已提交
132 133 134 135 136 137 138 139 140

  return pSyncNode;
}

void syncNodeClose(SSyncNode* pSyncNode) {
  assert(pSyncNode != NULL);
  free(pSyncNode);
}

M
Minghao Li 已提交
141
void syncNodePingAll(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
142
  sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
143 144
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
M
Minghao Li 已提交
145 146 147 148
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
149
    assert(ret == 0);
M
Minghao Li 已提交
150
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
151
  }
M
Minghao Li 已提交
152
}
M
Minghao Li 已提交
153

M
Minghao Li 已提交
154 155 156
void syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
157 158 159 160
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
161
    assert(ret == 0);
M
Minghao Li 已提交
162
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
163 164
  }
}
M
Minghao Li 已提交
165

M
Minghao Li 已提交
166
void syncNodePingSelf(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
167 168
  int32_t   ret;
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
M
Minghao Li 已提交
169
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
170
  assert(ret == 0);
M
Minghao Li 已提交
171
  syncPingDestroy(pMsg);
M
Minghao Li 已提交
172
}
M
Minghao Li 已提交
173

M
Minghao Li 已提交
174
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
175 176
  atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  pSyncNode->pingTimerMS = PING_TIMER_MS;
M
Minghao Li 已提交
177 178
  if (pSyncNode->pPingTimer == NULL) {
    pSyncNode->pPingTimer =
M
Minghao Li 已提交
179
        taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
M
Minghao Li 已提交
180
  } else {
M
Minghao Li 已提交
181
    taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
182 183 184 185 186 187
                 &pSyncNode->pPingTimer);
  }
  return 0;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
188
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
M
Minghao Li 已提交
189 190 191 192
  pSyncNode->pingTimerMS = TIMER_MAX_MS;
  return 0;
}

M
Minghao Li 已提交
193 194 195
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  pSyncNode->electTimerMS = ms;
  atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
196 197 198 199 200 201 202
  if (pSyncNode->pElectTimer == NULL) {
    pSyncNode->pElectTimer =
        taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pElectTimer);
  }
M
Minghao Li 已提交
203 204 205
  return 0;
}

M
Minghao Li 已提交
206
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
207
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
M
Minghao Li 已提交
208 209 210 211
  pSyncNode->electTimerMS = TIMER_MAX_MS;
  return 0;
}

M
Minghao Li 已提交
212 213 214 215 216 217
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return 0;
}

M
Minghao Li 已提交
218
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
219
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
M
Minghao Li 已提交
220 221 222 223 224 225 226 227 228 229 230
  if (pSyncNode->pHeartbeatTimer == NULL) {
    pSyncNode->pHeartbeatTimer =
        taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pHeartbeatTimer);
  }
  return 0;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
231
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
M
Minghao Li 已提交
232 233 234 235
  pSyncNode->heartbeatTimerMS = TIMER_MAX_MS;
  return 0;
}

M
Minghao Li 已提交
236
// ------ local funciton ---------
M
Minghao Li 已提交
237
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
M
Minghao Li 已提交
238
  sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
239
  int32_t ret = 0;
M
Minghao Li 已提交
240

M
Minghao Li 已提交
241 242 243
  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("syncNodePing pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

  {
    SyncPing* pMsg2 = rpcMsg.pCont;
    cJSON*    pJson = syncPing2Json(pMsg2);
    char*     serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
257
    sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
M
Minghao Li 已提交
258 259 260 261
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
262 263 264
  return ret;
}

M
Minghao Li 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
M
Minghao Li 已提交
280
  int32_t ret = 0;
M
Minghao Li 已提交
281 282 283 284 285
  sTrace("<-- syncNodeOnPingCb -->");

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
286
    sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized);
M
Minghao Li 已提交
287 288 289 290
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
291 292 293 294 295
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId);
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

M
Minghao Li 已提交
296 297 298
  return ret;
}

M
Minghao Li 已提交
299
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
300
  int32_t ret = 0;
M
Minghao Li 已提交
301 302 303 304 305
  sTrace("<-- syncNodeOnPingReplyCb -->");

  {
    cJSON* pJson = syncPingReply2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
306
    sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized);
M
Minghao Li 已提交
307 308 309 310
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
311 312 313
  return ret;
}

M
Minghao Li 已提交
314 315
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
316 317 318 319 320 321 322 323 324 325 326
  sTrace("<-- syncNodeOnTimeoutCb -->");

  {
    cJSON* pJson = syncTimeout2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

  if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
M
Minghao Li 已提交
327
    if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
M
Minghao Li 已提交
328 329 330 331 332 333 334 335 336
      ++(ths->pingTimerCounter);
      syncNodePingAll(ths);
    }

  } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
  } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
  } else {
  }

M
Minghao Li 已提交
337 338 339
  return ret;
}

M
Minghao Li 已提交
340 341
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
342
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
343 344 345 346 347 348 349 350
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
                                              pSyncNode->pingTimerMS, pSyncNode);
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncTimeoutDestroy(pSyncMsg);

    // reset timer ms
M
Minghao Li 已提交
351 352
    // pSyncNode->pingTimerMS += 100;

M
Minghao Li 已提交
353 354 355 356 357 358 359 360 361 362 363 364 365
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);
  } else {
    sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock,
           pSyncNode->pingTimerLogicClockUser);
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
                                              pSyncNode->electTimerMS, pSyncNode);
M
Minghao Li 已提交
366 367

    SRpcMsg rpcMsg;
M
Minghao Li 已提交
368 369 370 371
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
372 373 374
    // reset timer ms
    pSyncNode->electTimerMS = syncUtilElectRandomMS();

M
Minghao Li 已提交
375 376 377
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);
  } else {
M
Minghao Li 已提交
378 379
    sTrace("syncNodeEqElectTimer: electTimerLogicClock:%lu, electTimerLogicClockUser:%lu",
           pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
380 381 382
  }
}

M
Minghao Li 已提交
383 384 385 386
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}

static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
387
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
M
Minghao Li 已提交
388 389 390
  }

  syncNodeStopHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
391 392
  int32_t electMS = syncUtilElectRandomMS();
  syncNodeStartElectTimer(pSyncNode, electMS);
M
Minghao Li 已提交
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
}

static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
  pSyncNode->leaderCache = pSyncNode->raftId;

  // next Index +=1
  // match Index = 0;

  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);

  // appendEntries;
}

static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
}

static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}

static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}

M
Minghao Li 已提交
417 418 419 420 421
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}

void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}

void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {}