syncMain.c 44.9 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

M
Minghao Li 已提交
16
#include "sync.h"
M
Minghao Li 已提交
17 18
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
19
#include "syncCommit.h"
M
Minghao Li 已提交
20
#include "syncElection.h"
M
Minghao Li 已提交
21
#include "syncEnv.h"
M
Minghao Li 已提交
22
#include "syncIndexMgr.h"
M
Minghao Li 已提交
23
#include "syncInt.h"
M
Minghao Li 已提交
24
#include "syncMessage.h"
M
Minghao Li 已提交
25
#include "syncRaftCfg.h"
M
Minghao Li 已提交
26
#include "syncRaftLog.h"
M
Minghao Li 已提交
27
#include "syncRaftStore.h"
M
Minghao Li 已提交
28
#include "syncReplication.h"
M
Minghao Li 已提交
29 30
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
31
#include "syncRespMgr.h"
M
Minghao Li 已提交
32
#include "syncTimeout.h"
M
Minghao Li 已提交
33
#include "syncUtil.h"
M
Minghao Li 已提交
34
#include "syncVoteMgr.h"
M
Minghao Li 已提交
35
#include "tref.h"
M
Minghao Li 已提交
36

M
Minghao Li 已提交
37 38 39
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
40
// enqueue message ----
M
Minghao Li 已提交
41 42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
M
Minghao Li 已提交
46

M
Minghao Li 已提交
47
// process message ----
M
Minghao Li 已提交
48 49 50
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
M
Minghao Li 已提交
51 52 53

// life cycle
static void syncFreeNode(void* param);
M
Minghao Li 已提交
54 55 56
// ---------------------------------

int32_t syncInit() {
M
Minghao Li 已提交
57 58 59 60 61 62 63 64 65 66 67
  int32_t ret = 0;

  if (!syncEnvIsStart()) {
    tsNodeRefId = taosOpenRef(200, syncFreeNode);
    if (tsNodeRefId < 0) {
      sError("failed to init node ref");
      syncCleanUp();
      ret = -1;
    } else {
      ret = syncEnvStart();
    }
M
Minghao Li 已提交
68 69
  }

M
Minghao Li 已提交
70
  return ret;
M
Minghao Li 已提交
71
}
M
Minghao Li 已提交
72

M
Minghao Li 已提交
73 74 75
void syncCleanUp() {
  int32_t ret = syncEnvStop();
  assert(ret == 0);
M
Minghao Li 已提交
76 77 78 79 80

  if (tsNodeRefId != -1) {
    taosCloseRef(tsNodeRefId);
    tsNodeRefId = -1;
  }
M
Minghao Li 已提交
81
}
M
Minghao Li 已提交
82

M
Minghao Li 已提交
83
int64_t syncOpen(const SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
84 85
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87 88
  syncNodeLog2("syncNodeOpen open success", pSyncNode);

M
Minghao Li 已提交
89 90 91 92 93 94 95
  pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
  if (pSyncNode->rid < 0) {
    syncFreeNode(pSyncNode);
    return -1;
  }

  return pSyncNode->rid;
M
Minghao Li 已提交
96
}
M
Minghao Li 已提交
97

M
Minghao Li 已提交
98 99 100 101 102 103 104 105 106 107
void syncStart(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  syncNodeStart(pSyncNode);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
108 109 110 111 112 113 114 115 116 117
void syncStartStandBy(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  syncNodeStartStandBy(pSyncNode);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
118
void syncStop(int64_t rid) {
M
Minghao Li 已提交
119 120 121 122
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
123
  syncNodeClose(pSyncNode);
M
Minghao Li 已提交
124 125 126

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  taosRemoveRef(tsNodeRefId, rid);
M
Minghao Li 已提交
127
}
M
Minghao Li 已提交
128

M
Minghao Li 已提交
129 130
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
  int32_t ret = 0;
M
Minghao Li 已提交
131
  char*   configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
M
Minghao Li 已提交
132 133
  SRpcMsg rpcMsg = {0};
  rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
S
Shengliang Guan 已提交
134
  rpcMsg.info.noResp = 1;
M
Minghao Li 已提交
135 136 137 138 139
  rpcMsg.contLen = strlen(configChange) + 1;
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange);
  taosMemoryFree(configChange);
  ret = syncPropose(rid, &rpcMsg, false);
M
Minghao Li 已提交
140 141
  return ret;
}
M
Minghao Li 已提交
142

M
Minghao Li 已提交
143 144 145 146
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
  int32_t ret = syncPropose(rid, pMsg, isWeak);
  return ret;
}
M
Minghao Li 已提交
147

M
Minghao Li 已提交
148 149 150 151 152 153
ESyncState syncGetMyRole(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
M
Minghao Li 已提交
154 155 156 157
  ESyncState state = pSyncNode->state;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return state;
M
Minghao Li 已提交
158 159
}

M
Minghao Li 已提交
160 161 162 163 164
const char* syncGetMyRoleStr(int64_t rid) {
  const char* s = syncUtilState2String(syncGetMyRole(rid));
  return s;
}

M
Minghao Li 已提交
165 166 167 168 169 170 171 172 173 174 175 176
int32_t syncGetVgId(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
  int32_t vgId = pSyncNode->vgId;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return vgId;
}

M
Minghao Li 已提交
177
SyncTerm syncGetMyTerm(int64_t rid) {
M
Minghao Li 已提交
178 179
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
180 181 182 183 184 185 186 187 188
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
  SyncTerm term = pSyncNode->pRaftStore->currentTerm;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return term;
}

M
Minghao Li 已提交
189 190 191 192 193 194 195 196 197
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
  assert(rid == pSyncNode->rid);
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
198 199
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
M
Minghao Li 已提交
200
    (pEpSet->numOfEps)++;
M
Minghao Li 已提交
201 202

    sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
M
Minghao Li 已提交
203 204
  }
  pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
M
Minghao Li 已提交
205

M
Minghao Li 已提交
206
  sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
M
Minghao Li 已提交
207 208 209 210

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);

  SRespStub stub;
  int32_t   ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
    memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);

  SRespStub stub;
  int32_t   ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
    memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

S
Shengliang Guan 已提交
245
void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) {
M
Minghao Li 已提交
246 247 248 249 250 251
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
    return;
  }
  assert(rid == pSyncNode->rid);
S
Shengliang Guan 已提交
252
  pSyncNode->msgcb = msgcb;
M
Minghao Li 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

char* sync2SimpleStr(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid);
    return NULL;
  }
  assert(rid == pSyncNode->rid);
  char* s = syncNode2SimpleStr(pSyncNode);
  taosReleaseRef(tsNodeRefId, pSyncNode->rid);

  return s;
}

void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->pingBaseLine = pingTimerMS;
  pSyncNode->pingTimerMS = pingTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->electBaseLine = electTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->hbBaseLine = hbTimerMS;
  pSyncNode->heartbeatTimerMS = hbTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
306 307
  sTrace("syncPropose msgType:%d ", pMsg->msgType);

M
Minghao Li 已提交
308 309 310 311
  int32_t    ret = TAOS_SYNC_PROPOSE_SUCCESS;
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
M
Minghao Li 已提交
312 313 314
  }
  assert(rid == pSyncNode->rid);

M
Minghao Li 已提交
315
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
316 317 318 319 320 321
    SRespStub stub;
    stub.createTime = taosGetTimestampMs();
    stub.rpcMsg = *pMsg;
    uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);

    SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
M
Minghao Li 已提交
322 323
    SRpcMsg            rpcMsg;
    syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
324
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
325
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
326 327 328
    } else {
      sTrace("syncPropose pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
329
    syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
330
    ret = TAOS_SYNC_PROPOSE_SUCCESS;
M
Minghao Li 已提交
331

M
Minghao Li 已提交
332
  } else {
M
Minghao Li 已提交
333
    sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
334
    ret = TAOS_SYNC_PROPOSE_NOT_LEADER;
M
Minghao Li 已提交
335
  }
M
Minghao Li 已提交
336 337 338 339 340

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

M
Minghao Li 已提交
341
// open/close --------------
M
Minghao Li 已提交
342
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
wafwerar's avatar
wafwerar 已提交
343
  SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
M
Minghao Li 已提交
344
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
345
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
346

M
Minghao Li 已提交
347 348 349 350 351 352 353 354 355 356 357 358
  int32_t ret = 0;
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
      return NULL;
    }

    // create raft config file
    snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
    ret = syncCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncNode->configPath);
    assert(ret == 0);
M
Minghao Li 已提交
359 360
  }

M
Minghao Li 已提交
361
  // init by SSyncInfo
M
Minghao Li 已提交
362 363
  pSyncNode->vgId = pSyncInfo->vgId;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
M
Minghao Li 已提交
364
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path);
M
Minghao Li 已提交
365 366
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);

M
Minghao Li 已提交
367
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
368
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
369
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
370
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
371

M
Minghao Li 已提交
372 373 374 375
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
  assert(pSyncNode->pRaftCfg != NULL);

M
Minghao Li 已提交
376
  // init internal
M
Minghao Li 已提交
377 378
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
379

M
Minghao Li 已提交
380
  // init peersNum, peers, peersId
M
Minghao Li 已提交
381
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
M
Minghao Li 已提交
382
  int j = 0;
M
Minghao Li 已提交
383 384 385
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
386 387 388
      j++;
    }
  }
M
Minghao Li 已提交
389
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
390
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
391
  }
M
Minghao Li 已提交
392

M
Minghao Li 已提交
393
  // init replicaNum, replicasId
M
Minghao Li 已提交
394 395 396
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
397 398
  }

M
Minghao Li 已提交
399
  // init raft algorithm
M
Minghao Li 已提交
400
  pSyncNode->pFsm = pSyncInfo->pFsm;
M
Minghao Li 已提交
401
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
402 403
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
404
  // init life cycle outside
M
Minghao Li 已提交
405

M
Minghao Li 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
430
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
431
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
432
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
M
Minghao Li 已提交
433 434
  assert(pSyncNode->pRaftStore != NULL);

M
Minghao Li 已提交
435
  // init TLA+ candidate vars
M
Minghao Li 已提交
436 437 438 439 440
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
  assert(pSyncNode->pVotesGranted != NULL);
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
  assert(pSyncNode->pVotesRespond != NULL);

M
Minghao Li 已提交
441 442 443 444 445 446 447 448 449
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pNextIndex != NULL);
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pMatchIndex != NULL);

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
  assert(pSyncNode->pLogStore != NULL);
M
Minghao Li 已提交
450
  pSyncNode->commitIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
451

M
Minghao Li 已提交
452 453 454 455 456
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
457
  // init ping timer
M
Minghao Li 已提交
458
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
459
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
460 461
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
462
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
463
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
464

M
Minghao Li 已提交
465 466
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
467
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
468 469
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
M
Minghao Li 已提交
470
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
471 472 473 474
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
475
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
476 477
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
478
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
479 480
  pSyncNode->heartbeatTimerCounter = 0;

M
Minghao Li 已提交
481
  // init callback
M
Minghao Li 已提交
482 483
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
M
Minghao Li 已提交
484
  pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb;
M
Minghao Li 已提交
485 486 487 488
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
489
  pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
M
Minghao Li 已提交
490

M
Minghao Li 已提交
491 492 493 494 495
  // tools
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
  assert(pSyncNode->pSyncRespMgr != NULL);

  // start in syncNodeStart
M
Minghao Li 已提交
496
  // start raft
M
Minghao Li 已提交
497
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
498

M
Minghao Li 已提交
499 500 501
  return pSyncNode;
}

M
Minghao Li 已提交
502 503
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
504 505
  if (pSyncNode->replicaNum == 1) {
    syncNodeBecomeLeader(pSyncNode);
M
format  
Minghao Li 已提交
506

507
    syncNodeLog2("==state change become leader immediately==", pSyncNode);
M
format  
Minghao Li 已提交
508

509
    // Raft 3.6.2 Committing entries from previous terms
M
format  
Minghao Li 已提交
510

511 512 513 514 515 516
    // use this now
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);  // maybe only one replica
    return;
  }

M
Minghao Li 已提交
517 518 519 520 521 522 523 524
  syncNodeBecomeFollower(pSyncNode);

  // for test
  int32_t ret = 0;
  // ret = syncNodeStartPingTimer(pSyncNode);
  assert(ret == 0);
}

M
Minghao Li 已提交
525 526 527 528 529 530 531 532 533 534 535
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
}

M
Minghao Li 已提交
536
void syncNodeClose(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
537
  int32_t ret;
M
Minghao Li 已提交
538
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
539 540 541 542

  ret = raftStoreClose(pSyncNode->pRaftStore);
  assert(ret == 0);

M
Minghao Li 已提交
543
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
544 545 546 547 548
  voteGrantedDestroy(pSyncNode->pVotesGranted);
  votesRespondDestory(pSyncNode->pVotesRespond);
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
  logStoreDestory(pSyncNode->pLogStore);
M
Minghao Li 已提交
549
  raftCfgClose(pSyncNode->pRaftCfg);
M
Minghao Li 已提交
550 551 552 553 554

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
555 556 557 558 559 560
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

  // free memory in syncFreeNode
  // taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
561 562
}

M
Minghao Li 已提交
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
  syncPingLog2((char*)"==syncNodePing==", pMsg);
  int32_t ret = 0;

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);

  ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
}

int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
M
Minghao Li 已提交
578
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId, pSyncNode->vgId);
M
Minghao Li 已提交
579 580 581 582 583 584 585 586 587 588
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
  assert(ret == 0);

  syncPingDestroy(pMsg);
  return ret;
}

int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
589 590 591
    SRaftId*  destId = &(pSyncNode->peersId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
592 593 594 595 596 597 598 599
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

int32_t syncNodePingAll(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
600 601 602 603
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    SRaftId*  destId = &(pSyncNode->replicasId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pPingTimer);
  atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  pSyncNode->electTimerMS = ms;
  taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pElectTimer);
  atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
651 652
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
653
  int32_t electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
654 655 656 657
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  return ret;
}

M
Minghao Li 已提交
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pHeartbeatTimer);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
  return ret;
}

// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
M
Minghao Li 已提交
678 679 680 681
  if (pSyncNode->FpSendMsg != NULL) {
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

682
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
683
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
684 685 686
  } else {
    sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
  }
M
Minghao Li 已提交
687 688 689 690 691 692
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
M
Minghao Li 已提交
693 694 695 696
  if (pSyncNode->FpSendMsg != NULL) {
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

697
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
698
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
699 700 701
  } else {
    sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
  }
M
Minghao Li 已提交
702 703 704
  return 0;
}

M
Minghao Li 已提交
705 706 707 708
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
  char   u64buf[128];
  cJSON* pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
709 710 711
  if (pSyncNode != NULL) {
    // init by SSyncInfo
    cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
M
Minghao Li 已提交
712
    cJSON_AddItemToObject(pRoot, "SRaftCfg", raftCfg2Json(pSyncNode->pRaftCfg));
M
Minghao Li 已提交
713
    cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
M
Minghao Li 已提交
714 715 716
    cJSON_AddStringToObject(pRoot, "raftStorePath", pSyncNode->raftStorePath);
    cJSON_AddStringToObject(pRoot, "configPath", pSyncNode->configPath);

M
Minghao Li 已提交
717 718 719
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);

S
Shengliang Guan 已提交
720
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
721 722 723 724
    cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
    cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);

S
Shengliang Guan 已提交
725
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746
    cJSON_AddStringToObject(pRoot, "queue", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
    cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);

    // init internal
    cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
    cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
    cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
    cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);

    cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
    cJSON* pPeers = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
    }
    cJSON* pPeersId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
    }
M
Minghao Li 已提交
747

M
Minghao Li 已提交
748 749 750 751 752 753
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
    cJSON* pReplicasId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
    }
M
Minghao Li 已提交
754

M
Minghao Li 已提交
755 756 757 758 759 760 761
    // raft algorithm
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
    cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
    cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
    cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);

M
Minghao Li 已提交
762 763 764 765
    // life cycle
    snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->rid);
    cJSON_AddStringToObject(pRoot, "rid", u64buf);

M
Minghao Li 已提交
766 767 768
    // tla+ server vars
    cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
    cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
769
    cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore));
M
Minghao Li 已提交
770 771 772 773 774 775 776 777 778 779 780

    // tla+ candidate vars
    cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
    cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));

    // tla+ leader vars
    cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
    cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));

    // tla+ log vars
    cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
M
Minghao Li 已提交
781
    snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex);
M
Minghao Li 已提交
782 783
    cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);

M
Minghao Li 已提交
784 785 786 787 788
    // timer ms init
    cJSON_AddNumberToObject(pRoot, "pingBaseLine", pSyncNode->pingBaseLine);
    cJSON_AddNumberToObject(pRoot, "electBaseLine", pSyncNode->electBaseLine);
    cJSON_AddNumberToObject(pRoot, "hbBaseLine", pSyncNode->hbBaseLine);

M
Minghao Li 已提交
789 790 791 792
    // ping timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
    cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
M
Minghao Li 已提交
793
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock);
M
Minghao Li 已提交
794
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
M
Minghao Li 已提交
795
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
796 797 798
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
    cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
M
Minghao Li 已提交
799
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter);
M
Minghao Li 已提交
800 801 802 803 804 805
    cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);

    // elect timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
    cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
M
Minghao Li 已提交
806
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock);
M
Minghao Li 已提交
807
    cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
M
Minghao Li 已提交
808
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
809 810 811
    cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
    cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
M
Minghao Li 已提交
812
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter);
M
Minghao Li 已提交
813 814 815 816 817 818
    cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);

    // heartbeat timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
    cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
819
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock);
M
Minghao Li 已提交
820
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
M
Minghao Li 已提交
821
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser);
M
Minghao Li 已提交
822 823 824
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
    cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
M
Minghao Li 已提交
825
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter);
M
Minghao Li 已提交
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842
    cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);

    // callback
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
    cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
    cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
    cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
M
Minghao Li 已提交
843 844 845 846 847 848 849
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
  return pJson;
}

M
Minghao Li 已提交
850 851 852 853 854 855 856
char* syncNode2Str(const SSyncNode* pSyncNode) {
  cJSON* pJson = syncNode2Json(pSyncNode);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
857 858 859 860 861 862 863 864 865 866 867 868 869
char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
  int   len = 256;
  char* s = (char*)taosMemoryMalloc(len);
  snprintf(s, len,
           "syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, electTimerLogicClock:%lu, "
           "electTimerLogicClockUser:%lu, "
           "electTimerMS:%d",
           pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->state,
           syncUtilState2String(pSyncNode->state), pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser,
           pSyncNode->electTimerMS);
  return s;
}

M
Minghao Li 已提交
870
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
M
Minghao Li 已提交
871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896
  pSyncNode->pRaftCfg->cfg = *newConfig;
  int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
  ASSERT(ret == 0);

  // init internal
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);

  // init peersNum, peers, peersId
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
  int j = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
      j++;
    }
  }
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
  }

  // init replicaNum, replicasId
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
  }
M
Minghao Li 已提交
897 898 899 900 901

  syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
  syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);

  syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
M
Minghao Li 已提交
902 903
}

M
Minghao Li 已提交
904 905 906 907 908 909 910 911 912 913 914
SSyncNode* syncNodeAcquire(int64_t rid) {
  SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid);
  if (pNode == NULL) {
    sTrace("failed to acquire node from refId:%" PRId64, rid);
  }

  return pNode;
}

void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid); }

M
Minghao Li 已提交
915 916 917 918 919 920 921 922 923 924
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
    syncNodeBecomeFollower(pSyncNode);
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
925
  // maybe clear leader cache
M
Minghao Li 已提交
926 927 928 929
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
930
  // state change
M
Minghao Li 已提交
931 932 933
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
934 935
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
957
  // state change
M
Minghao Li 已提交
958
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
959 960

  // set leader cache
M
Minghao Li 已提交
961 962 963
  pSyncNode->leaderCache = pSyncNode->myRaftId;

  for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
964 965
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
966 967 968 969
    pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
  }

  for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
970 971
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
972 973 974
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
975
  // stop elect timer
M
Minghao Li 已提交
976
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
977 978

  // start replicate right now!
M
Minghao Li 已提交
979
  syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
980 981 982

  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
983 984 985 986 987 988
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  assert(voteGrantedMajority(pSyncNode->pVotesGranted));
  syncNodeBecomeLeader(pSyncNode);
M
Minghao Li 已提交
989

M
Minghao Li 已提交
990 991
  syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);

M
Minghao Li 已提交
992
  // Raft 3.6.2 Committing entries from previous terms
M
Minghao Li 已提交
993 994

  // use this now
M
Minghao Li 已提交
995
  syncNodeAppendNoop(pSyncNode);
996
  syncMaybeAdvanceCommitIndex(pSyncNode);  // maybe only one replica
M
Minghao Li 已提交
997 998

  // do not use this
M
Minghao Li 已提交
999
  // syncNodeEqNoop(pSyncNode);
M
Minghao Li 已提交
1000 1001 1002 1003 1004
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
M
Minghao Li 已提交
1005 1006

  syncNodeLog2("==state change syncNodeFollower2Candidate==", pSyncNode);
M
Minghao Li 已提交
1007 1008 1009 1010 1011
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
  syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1012 1013

  syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode);
M
Minghao Li 已提交
1014 1015 1016 1017 1018
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1019 1020

  syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode);
M
Minghao Li 已提交
1021 1022 1023
}

// raft vote --------------
M
Minghao Li 已提交
1024 1025 1026

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1027 1028 1029 1030 1031 1032 1033
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
  assert(term == pSyncNode->pRaftStore->currentTerm);
  assert(!raftStoreHasVoted(pSyncNode->pRaftStore));

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1034
// simulate get vote from outside
M
Minghao Li 已提交
1035 1036 1037
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

M
Minghao Li 已提交
1038
  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId);
M
Minghao Li 已提交
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}

M
Minghao Li 已提交
1049 1050 1051 1052 1053
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
1054
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1055 1056 1057 1058 1059 1060
}

void syncNodePrint2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
1061
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1062 1063 1064 1065
}

void syncNodeLog(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
M
Minghao Li 已提交
1066
  sTraceLong("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
1067
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1068 1069 1070 1071
}

void syncNodeLog2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
M
Minghao Li 已提交
1072
  sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
1073
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1074 1075
}

M
Minghao Li 已提交
1076
// ------ local funciton ---------
M
Minghao Li 已提交
1077
// enqueue message ----
M
Minghao Li 已提交
1078 1079
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
1080
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
1081
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
M
Minghao Li 已提交
1082
                                              pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1083 1084
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1085
    syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
M
Minghao Li 已提交
1086
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1087
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1088 1089 1090
    } else {
      sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1091 1092
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1093
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
1094 1095
                 &pSyncNode->pPingTimer);
  } else {
1096
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
1097
           pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1098 1099 1100 1101 1102 1103 1104
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
M
Minghao Li 已提交
1105
                                              pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1106
    SRpcMsg      rpcMsg;
M
Minghao Li 已提交
1107
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1108
    syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
M
Minghao Li 已提交
1109
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1110
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1111 1112 1113
    } else {
      sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1114 1115
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1116
    // reset timer ms
M
Minghao Li 已提交
1117
    pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1118 1119
    taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pElectTimer);
M
Minghao Li 已提交
1120
  } else {
1121
    sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
1122
           pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
1123 1124 1125
  }
}

M
Minghao Li 已提交
1126 1127 1128 1129 1130 1131
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
      atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
    SyncTimeout* pSyncMsg =
        syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
M
Minghao Li 已提交
1132
                          pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1133 1134
    SRpcMsg rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1135
    syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
M
Minghao Li 已提交
1136
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1137
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1138 1139 1140
    } else {
      sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1141 1142
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1143
    taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
1144 1145
                 &pSyncNode->pHeartbeatTimer);
  } else {
M
Minghao Li 已提交
1146 1147
    sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
           "",
M
Minghao Li 已提交
1148 1149 1150 1151
           pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  }
}

M
Minghao Li 已提交
1152 1153 1154 1155 1156 1157
static int32_t syncNodeEqNoop(SSyncNode* ths) {
  int32_t ret = 0;
  assert(ths->state == TAOS_SYNC_STATE_LEADER);

  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1158
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1159 1160 1161 1162 1163 1164 1165 1166
  assert(pEntry != NULL);

  uint32_t           entryLen;
  char*              serialized = syncEntrySerialize(pEntry, &entryLen);
  SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen);
  assert(pSyncMsg->dataLen == entryLen);
  memcpy(pSyncMsg->data, serialized, entryLen);

S
Shengliang Guan 已提交
1167
  SRpcMsg rpcMsg = {0};
M
Minghao Li 已提交
1168
  syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1169
  if (ths->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1170
    ths->FpEqMsg(ths->msgcb, &rpcMsg);
M
Minghao Li 已提交
1171 1172 1173
  } else {
    sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
  }
M
Minghao Li 已提交
1174

wafwerar's avatar
wafwerar 已提交
1175
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
  syncClientRequestDestroy(pSyncMsg);

  return ret;
}

static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1186
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1187 1188 1189 1190 1191 1192 1193
  assert(pEntry != NULL);

  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
    syncNodeReplicate(ths);
  }

M
Minghao Li 已提交
1194
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1195 1196 1197
  return ret;
}

M
Minghao Li 已提交
1198
// on message ----
M
Minghao Li 已提交
1199 1200 1201 1202 1203 1204 1205 1206 1207
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
  // log state
  char logBuf[1024];
  snprintf(logBuf, sizeof(logBuf),
           "==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%lu electTimerLogicClock:%lu, "
           "electTimerLogicClockUser:%lu, electTimerMS:%d",
           ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm,
           ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS);

M
Minghao Li 已提交
1208
  int32_t ret = 0;
M
Minghao Li 已提交
1209 1210
  syncPingLog2(logBuf, pMsg);
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
M
Minghao Li 已提交
1211 1212
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
M
Minghao Li 已提交
1213 1214 1215 1216 1217 1218 1219 1220

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

M
Minghao Li 已提交
1221 1222 1223 1224 1225
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

  return ret;
}

M
Minghao Li 已提交
1226
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
1227 1228 1229 1230
  int32_t ret = 0;
  syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg);
  return ret;
}
M
Minghao Li 已提交
1231

M
Minghao Li 已提交
1232 1233 1234 1235 1236 1237 1238 1239 1240 1241
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
1242
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
M
Minghao Li 已提交
1243 1244 1245
  int32_t ret = 0;
  syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);

M
Minghao Li 已提交
1246 1247 1248 1249 1250
  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
  assert(pEntry != NULL);

M
Minghao Li 已提交
1251 1252
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
M
Minghao Li 已提交
1253 1254

    // start replicate right now!
M
Minghao Li 已提交
1255
    syncNodeReplicate(ths);
M
Minghao Li 已提交
1256

M
Minghao Li 已提交
1257 1258 1259 1260
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
1261
    if (ths->pFsm != NULL) {
M
Minghao Li 已提交
1262
      // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
1263
      if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
1264 1265 1266 1267 1268 1269 1270
        SFsmCbMeta cbMeta;
        cbMeta.index = pEntry->index;
        cbMeta.isWeak = pEntry->isWeak;
        cbMeta.code = 0;
        cbMeta.state = ths->state;
        cbMeta.seqNum = pEntry->seqNum;
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
1271
      }
M
Minghao Li 已提交
1272 1273 1274
    }
    rpcFreeCont(rpcMsg.pCont);

M
Minghao Li 已提交
1275 1276 1277
    // only myself, maybe commit
    syncMaybeAdvanceCommitIndex(ths);

M
Minghao Li 已提交
1278
  } else {
M
Minghao Li 已提交
1279 1280 1281 1282
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
1283
    if (ths->pFsm != NULL) {
M
Minghao Li 已提交
1284
      // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
1285
      if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
1286 1287 1288 1289 1290 1291 1292
        SFsmCbMeta cbMeta;
        cbMeta.index = pEntry->index;
        cbMeta.isWeak = pEntry->isWeak;
        cbMeta.code = 1;
        cbMeta.state = ths->state;
        cbMeta.seqNum = pEntry->seqNum;
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
1293
      }
M
Minghao Li 已提交
1294 1295
    }
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1296 1297
  }

M
Minghao Li 已提交
1298
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1299
  return ret;
1300
}
M
Minghao Li 已提交
1301 1302 1303

static void syncFreeNode(void* param) {
  SSyncNode* pNode = param;
M
Minghao Li 已提交
1304 1305
  // inner object already free
  // syncNodePrint2((char*)"==syncFreeNode==", pNode);
M
Minghao Li 已提交
1306

wafwerar's avatar
wafwerar 已提交
1307
  taosMemoryFree(pNode);
M
Minghao Li 已提交
1308
}
S
Shengliang Guan 已提交
1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320

const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
      return "FOLLOWER";
    case TAOS_SYNC_STATE_CANDIDATE:
      return "CANDIDATE";
    case TAOS_SYNC_STATE_LEADER:
      return "LEADER";
    default:
      return "ERROR";
  }
M
Minghao Li 已提交
1321
}