syncMain.c 26.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncElection.h"
M
Minghao Li 已提交
21
#include "syncEnv.h"
M
Minghao Li 已提交
22
#include "syncIndexMgr.h"
M
Minghao Li 已提交
23
#include "syncInt.h"
M
Minghao Li 已提交
24
#include "syncMessage.h"
M
Minghao Li 已提交
25
#include "syncRaftLog.h"
M
Minghao Li 已提交
26
#include "syncRaftStore.h"
M
Minghao Li 已提交
27
#include "syncReplication.h"
M
Minghao Li 已提交
28 29
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
30
#include "syncTimeout.h"
M
Minghao Li 已提交
31
#include "syncUtil.h"
M
Minghao Li 已提交
32
#include "syncVoteMgr.h"
M
Minghao Li 已提交
33

M
Minghao Li 已提交
34 35 36
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
37
// enqueue message ----
M
Minghao Li 已提交
38 39 40 41
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);

M
Minghao Li 已提交
42
// on message ----
M
Minghao Li 已提交
43 44
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
M
Minghao Li 已提交
45

M
Minghao Li 已提交
46
// raft state change ----
M
Minghao Li 已提交
47
static void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
M
Minghao Li 已提交
48 49
static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
M
Minghao Li 已提交
50

M
Minghao Li 已提交
51
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
M
Minghao Li 已提交
52
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
M
Minghao Li 已提交
53 54
static void syncNodeLeader2Follower(SSyncNode* pSyncNode);
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
M
Minghao Li 已提交
55

M
Minghao Li 已提交
56 57 58
// raft vote ----
static void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
static void syncNodeVoteForSelf(SSyncNode* pSyncNode);
M
Minghao Li 已提交
59 60 61
// ---------------------------------

int32_t syncInit() {
M
Minghao Li 已提交
62 63
  int32_t ret = syncEnvStart();
  return ret;
M
Minghao Li 已提交
64
}
M
Minghao Li 已提交
65

M
Minghao Li 已提交
66 67 68 69
void syncCleanUp() {
  int32_t ret = syncEnvStop();
  assert(ret == 0);
}
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71
int64_t syncStart(const SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
72
  int32_t    ret = 0;
M
Minghao Li 已提交
73 74
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
75
  return ret;
M
Minghao Li 已提交
76
}
M
Minghao Li 已提交
77

M
Minghao Li 已提交
78 79 80 81
void syncStop(int64_t rid) {
  SSyncNode* pSyncNode = NULL;  // get pointer from rid
  syncNodeClose(pSyncNode);
}
M
Minghao Li 已提交
82

M
Minghao Li 已提交
83 84 85 86
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
  int32_t ret = 0;
  return ret;
}
M
Minghao Li 已提交
87

M
Minghao Li 已提交
88
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
89
  int32_t    ret = 0;
M
Minghao Li 已提交
90 91 92 93 94 95 96
  SSyncNode* pSyncNode = NULL;  // get pointer from rid
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak);
    SRpcMsg            rpcMsg;
    syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
97 98
    ret = 0;

M
Minghao Li 已提交
99 100
  } else {
    sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
101
    ret = -1;  // need define err code !!
M
Minghao Li 已提交
102
  }
M
Minghao Li 已提交
103
  return ret;
M
Minghao Li 已提交
104
}
M
Minghao Li 已提交
105

M
Minghao Li 已提交
106 107 108 109
ESyncState syncGetMyRole(int64_t rid) {
  SSyncNode* pSyncNode = NULL;  // get pointer from rid
  return pSyncNode->state;
}
M
Minghao Li 已提交
110

M
Minghao Li 已提交
111 112
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}

M
Minghao Li 已提交
113
// open/close --------------
M
Minghao Li 已提交
114 115 116
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
117
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
118

M
Minghao Li 已提交
119
  // init by SSyncInfo
M
Minghao Li 已提交
120 121 122
  pSyncNode->vgId = pSyncInfo->vgId;
  pSyncNode->syncCfg = pSyncInfo->syncCfg;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
M
Minghao Li 已提交
123
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path);
M
Minghao Li 已提交
124
  pSyncNode->pWal = pSyncInfo->pWal;
M
Minghao Li 已提交
125 126
  pSyncNode->rpcClient = pSyncInfo->rpcClient;
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
127 128
  pSyncNode->queue = pSyncInfo->queue;
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
129

M
Minghao Li 已提交
130
  // init internal
M
Minghao Li 已提交
131 132
  pSyncNode->myNodeInfo = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncInfo->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
133

M
Minghao Li 已提交
134 135
  // init peersNum, peers, peersId
  pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
M
Minghao Li 已提交
136 137 138
  int j = 0;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    if (i != pSyncInfo->syncCfg.myIndex) {
M
Minghao Li 已提交
139
      pSyncNode->peersNodeInfo[j] = pSyncInfo->syncCfg.nodeInfo[i];
M
Minghao Li 已提交
140 141 142
      j++;
    }
  }
M
Minghao Li 已提交
143
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
144
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncInfo->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
145
  }
M
Minghao Li 已提交
146

M
Minghao Li 已提交
147 148 149 150 151 152
  // init replicaNum, replicasId
  pSyncNode->replicaNum = pSyncInfo->syncCfg.replicaNum;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncInfo->syncCfg.nodeInfo[i], pSyncInfo->vgId, &pSyncNode->replicasId[i]);
  }

M
Minghao Li 已提交
153
  // init raft algorithm
M
Minghao Li 已提交
154 155 156 157
  pSyncNode->pFsm = pSyncInfo->pFsm;
  pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum);
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
158
  // init life cycle
M
Minghao Li 已提交
159

M
Minghao Li 已提交
160
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
161
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
162
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
M
Minghao Li 已提交
163 164
  assert(pSyncNode->pRaftStore != NULL);

M
Minghao Li 已提交
165
  // init TLA+ candidate vars
M
Minghao Li 已提交
166 167 168 169 170
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
  assert(pSyncNode->pVotesGranted != NULL);
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
  assert(pSyncNode->pVotesRespond != NULL);

M
Minghao Li 已提交
171 172 173 174 175 176 177 178 179 180
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pNextIndex != NULL);
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pMatchIndex != NULL);

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
  assert(pSyncNode->pLogStore != NULL);
  pSyncNode->commitIndex = 0;
M
Minghao Li 已提交
181

M
Minghao Li 已提交
182
  // init ping timer
M
Minghao Li 已提交
183
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
184 185 186
  pSyncNode->pingTimerMS = PING_TIMER_MS;
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
187
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
188
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
189

M
Minghao Li 已提交
190 191 192 193 194
  // init elect timer
  pSyncNode->pElectTimer = NULL;
  pSyncNode->electTimerMS = syncUtilElectRandomMS();
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
M
Minghao Li 已提交
195
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
196 197 198 199 200 201 202
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
  pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS;
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
203
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
204 205
  pSyncNode->heartbeatTimerCounter = 0;

M
Minghao Li 已提交
206
  // init callback
M
Minghao Li 已提交
207 208 209 210 211 212
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
213
  pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
M
Minghao Li 已提交
214 215 216 217 218

  return pSyncNode;
}

void syncNodeClose(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
219
  int32_t ret;
M
Minghao Li 已提交
220
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
221 222 223 224 225 226 227 228 229 230 231 232 233 234

  ret = raftStoreClose(pSyncNode->pRaftStore);
  assert(ret == 0);

  voteGrantedDestroy(pSyncNode->pVotesGranted);
  votesRespondDestory(pSyncNode->pVotesRespond);
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
  logStoreDestory(pSyncNode->pLogStore);

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
235 236 237
  free(pSyncNode);
}

M
Minghao Li 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
  syncPingLog2((char*)"==syncNodePing==", pMsg);
  int32_t ret = 0;

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);

  ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
}

int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId);
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
  assert(ret == 0);

  syncPingDestroy(pMsg);
  return ret;
}

int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

int32_t syncNodePingAll(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pPingTimer);
  atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  pSyncNode->electTimerMS = ms;
  taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pElectTimer);
  atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pHeartbeatTimer);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
  return ret;
}

// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

M
Minghao Li 已提交
359 360 361 362 363 364 365
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
  char   u64buf[128];
  cJSON* pRoot = cJSON_CreateObject();

  // init by SSyncInfo
  cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
  cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
M
Minghao Li 已提交
366 367
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
  cJSON_AddStringToObject(pRoot, "pWal", u64buf);
M
Minghao Li 已提交
368 369 370 371 372 373 374 375 376 377 378 379

  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient);
  cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
  cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);

  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue);
  cJSON_AddStringToObject(pRoot, "queue", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
  cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);

  // init internal
M
Minghao Li 已提交
380 381 382 383
  cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
  cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
  cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
  cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);
M
Minghao Li 已提交
384 385 386

  cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
  cJSON* pPeers = cJSON_CreateArray();
M
Minghao Li 已提交
387
  cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
M
Minghao Li 已提交
388
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
389
    cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
M
Minghao Li 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
  }
  cJSON* pPeersId = cJSON_CreateArray();
  cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
  }

  cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
  cJSON* pReplicasId = cJSON_CreateArray();
  cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
  for (int i = 0; i < pSyncNode->replicaNum; ++i) {
    cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
  }

  // raft algorithm
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
  cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
  cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
  cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
  cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);

  // tla+ server vars
M
Minghao Li 已提交
412 413
  cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
  cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
414 415 416
  char tmpBuf[RAFT_STORE_BLOCK_SIZE];
  raftStoreSerialize(pSyncNode->pRaftStore, tmpBuf, sizeof(tmpBuf));
  cJSON_AddStringToObject(pRoot, "pRaftStore", tmpBuf);
M
Minghao Li 已提交
417 418

  // tla+ candidate vars
M
Minghao Li 已提交
419 420
  cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
  cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));
M
Minghao Li 已提交
421 422

  // tla+ leader vars
M
Minghao Li 已提交
423 424
  cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
  cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));
M
Minghao Li 已提交
425 426

  // tla+ log vars
M
Minghao Li 已提交
427 428 429
  cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
  snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->commitIndex);
  cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
M
Minghao Li 已提交
430

M
Minghao Li 已提交
431 432 433 434 435 436 437 438
  // ping timer
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
  cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
  cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
  snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClock);
  cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClockUser);
  cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
M
Minghao Li 已提交
439 440
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
  cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
M
Minghao Li 已提交
441 442 443 444 445 446 447 448 449 450 451
  snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerCounter);
  cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);

  // elect timer
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
  cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
  cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
  snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClock);
  cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClockUser);
  cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
M
Minghao Li 已提交
452 453
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
  cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
M
Minghao Li 已提交
454 455 456 457 458 459 460 461 462 463 464
  snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerCounter);
  cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);

  // heartbeat timer
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
  cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
  cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
  snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClock);
  cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClockUser);
  cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
M
Minghao Li 已提交
465 466
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
  cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
M
Minghao Li 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
  snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerCounter);
  cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);

  // callback
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
  cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
  cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
  cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
  cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
  cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
  cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
  snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
  cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);

M
Minghao Li 已提交
486 487 488 489 490
  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
  return pJson;
}

M
Minghao Li 已提交
491 492 493 494 495 496 497
char* syncNode2Str(const SSyncNode* pSyncNode) {
  cJSON* pJson = syncNode2Json(pSyncNode);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
  free(serialized);
}

void syncNodePrint2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
  free(serialized);
}

void syncNodeLog(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  sTrace("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
  free(serialized);
}

void syncNodeLog2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  sTrace("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
  free(serialized);
}

M
Minghao Li 已提交
525
// ------ local funciton ---------
M
Minghao Li 已提交
526
// enqueue message ----
M
Minghao Li 已提交
527 528
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
529
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
530 531 532 533
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
                                              pSyncNode->pingTimerMS, pSyncNode);
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
534
    syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
M
Minghao Li 已提交
535 536 537
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
538
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
539 540
                 &pSyncNode->pPingTimer);
  } else {
M
Minghao Li 已提交
541 542
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu",
           pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
543 544 545 546 547 548 549 550
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
                                              pSyncNode->electTimerMS, pSyncNode);
M
Minghao Li 已提交
551
    SRpcMsg      rpcMsg;
M
Minghao Li 已提交
552
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
553
    syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
M
Minghao Li 已提交
554 555 556
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
557 558
    // reset timer ms
    pSyncNode->electTimerMS = syncUtilElectRandomMS();
M
Minghao Li 已提交
559
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
560 561
                 &pSyncNode->pPingTimer);
  } else {
M
Minghao Li 已提交
562
    sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%lu, electTimerLogicClockUser:%lu",
M
Minghao Li 已提交
563
           pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
564 565 566
  }
}

M
Minghao Li 已提交
567 568 569 570 571 572 573 574 575
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
      atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
    SyncTimeout* pSyncMsg =
        syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
                          pSyncNode->heartbeatTimerMS, pSyncNode);
    SRpcMsg rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
576
    syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
M
Minghao Li 已提交
577 578 579
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
580
    taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
581 582
                 &pSyncNode->pHeartbeatTimer);
  } else {
M
Minghao Li 已提交
583
    sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu",
M
Minghao Li 已提交
584 585 586 587
           pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  }
}

M
Minghao Li 已提交
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
// on message ----
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
  int32_t ret = 0;
  syncPingLog2("==syncNodeOnPingCb==", pMsg);
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId);
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

  return ret;
}

static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
  int32_t ret = 0;
  syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg);
  return ret;
}

M
Minghao Li 已提交
606
// raft state change ----
M
Minghao Li 已提交
607
static void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
M
Minghao Li 已提交
608
  if (term > pSyncNode->pRaftStore->currentTerm) {
M
Minghao Li 已提交
609
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
M
Minghao Li 已提交
610
    syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
611
    raftStoreClearVote(pSyncNode->pRaftStore);
M
Minghao Li 已提交
612 613
  }
}
M
Minghao Li 已提交
614

M
Minghao Li 已提交
615 616
static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
617
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
M
Minghao Li 已提交
618 619
  }

M
Minghao Li 已提交
620
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
621
  syncNodeStopHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
622

M
Minghao Li 已提交
623
  int32_t electMS = syncUtilElectRandomMS();
M
Minghao Li 已提交
624
  syncNodeRestartElectTimer(pSyncNode, electMS);
M
Minghao Li 已提交
625 626
}

M
Minghao Li 已提交
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644
// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
M
Minghao Li 已提交
645 646
static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
647
  pSyncNode->leaderCache = pSyncNode->myRaftId;
M
Minghao Li 已提交
648

M
Minghao Li 已提交
649 650 651 652 653 654 655
  for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
    pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
  }

  for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }
M
Minghao Li 已提交
656 657 658

  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
659 660
  syncNodeReplicate(pSyncNode);
}
M
Minghao Li 已提交
661

M
Minghao Li 已提交
662 663 664 665
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  assert(voteGrantedMajority(pSyncNode->pVotesGranted));
  syncNodeBecomeLeader(pSyncNode);
M
Minghao Li 已提交
666 667 668 669 670 671 672
}

static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
}

M
Minghao Li 已提交
673 674 675 676
static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
  syncNodeBecomeFollower(pSyncNode);
}
M
Minghao Li 已提交
677

M
Minghao Li 已提交
678 679 680 681
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  syncNodeBecomeFollower(pSyncNode);
}
M
Minghao Li 已提交
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703

// raft vote ----
static void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
  assert(term == pSyncNode->pRaftStore->currentTerm);
  assert(!raftStoreHasVoted(pSyncNode->pRaftStore));

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

static void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild();
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}