syncMain.c 46.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

M
Minghao Li 已提交
16
#include "sync.h"
M
Minghao Li 已提交
17 18
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
19
#include "syncCommit.h"
M
Minghao Li 已提交
20
#include "syncElection.h"
M
Minghao Li 已提交
21
#include "syncEnv.h"
M
Minghao Li 已提交
22
#include "syncIndexMgr.h"
M
Minghao Li 已提交
23
#include "syncInt.h"
M
Minghao Li 已提交
24
#include "syncMessage.h"
M
Minghao Li 已提交
25
#include "syncRaftCfg.h"
M
Minghao Li 已提交
26
#include "syncRaftLog.h"
M
Minghao Li 已提交
27
#include "syncRaftStore.h"
M
Minghao Li 已提交
28
#include "syncReplication.h"
M
Minghao Li 已提交
29 30
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
31
#include "syncRespMgr.h"
M
Minghao Li 已提交
32
#include "syncTimeout.h"
M
Minghao Li 已提交
33
#include "syncUtil.h"
M
Minghao Li 已提交
34
#include "syncVoteMgr.h"
M
Minghao Li 已提交
35
#include "tref.h"
M
Minghao Li 已提交
36

M
Minghao Li 已提交
37 38 39
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
40
// enqueue message ----
M
Minghao Li 已提交
41 42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
M
Minghao Li 已提交
46

M
Minghao Li 已提交
47
// process message ----
M
Minghao Li 已提交
48 49 50
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
M
Minghao Li 已提交
51 52 53

// life cycle
static void syncFreeNode(void* param);
M
Minghao Li 已提交
54 55 56
// ---------------------------------

int32_t syncInit() {
M
Minghao Li 已提交
57 58 59 60 61 62 63 64 65 66 67
  int32_t ret = 0;

  if (!syncEnvIsStart()) {
    tsNodeRefId = taosOpenRef(200, syncFreeNode);
    if (tsNodeRefId < 0) {
      sError("failed to init node ref");
      syncCleanUp();
      ret = -1;
    } else {
      ret = syncEnvStart();
    }
M
Minghao Li 已提交
68 69
  }

M
Minghao Li 已提交
70
  return ret;
M
Minghao Li 已提交
71
}
M
Minghao Li 已提交
72

M
Minghao Li 已提交
73 74 75
void syncCleanUp() {
  int32_t ret = syncEnvStop();
  assert(ret == 0);
M
Minghao Li 已提交
76 77 78 79 80

  if (tsNodeRefId != -1) {
    taosCloseRef(tsNodeRefId);
    tsNodeRefId = -1;
  }
M
Minghao Li 已提交
81
}
M
Minghao Li 已提交
82

M
Minghao Li 已提交
83
int64_t syncOpen(const SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
84 85
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87 88
  syncNodeLog2("syncNodeOpen open success", pSyncNode);

M
Minghao Li 已提交
89 90 91 92 93 94 95
  pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
  if (pSyncNode->rid < 0) {
    syncFreeNode(pSyncNode);
    return -1;
  }

  return pSyncNode->rid;
M
Minghao Li 已提交
96
}
M
Minghao Li 已提交
97

M
Minghao Li 已提交
98 99 100 101 102 103 104 105 106 107
void syncStart(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  syncNodeStart(pSyncNode);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
108 109 110 111 112 113 114 115 116 117
void syncStartStandBy(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  syncNodeStartStandBy(pSyncNode);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
118
void syncStop(int64_t rid) {
M
Minghao Li 已提交
119 120 121 122
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
123
  syncNodeClose(pSyncNode);
M
Minghao Li 已提交
124 125 126

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  taosRemoveRef(tsNodeRefId, rid);
M
Minghao Li 已提交
127
}
M
Minghao Li 已提交
128

M
Minghao Li 已提交
129 130
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
  int32_t ret = 0;
M
Minghao Li 已提交
131
  char*   configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
M
Minghao Li 已提交
132 133
  SRpcMsg rpcMsg = {0};
  rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
S
Shengliang Guan 已提交
134
  rpcMsg.info.noResp = 1;
M
Minghao Li 已提交
135 136 137 138 139
  rpcMsg.contLen = strlen(configChange) + 1;
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange);
  taosMemoryFree(configChange);
  ret = syncPropose(rid, &rpcMsg, false);
M
Minghao Li 已提交
140 141
  return ret;
}
M
Minghao Li 已提交
142

M
Minghao Li 已提交
143 144 145 146
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
  int32_t ret = syncPropose(rid, pMsg, isWeak);
  return ret;
}
M
Minghao Li 已提交
147

M
Minghao Li 已提交
148 149 150 151 152 153
ESyncState syncGetMyRole(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
M
Minghao Li 已提交
154 155 156 157
  ESyncState state = pSyncNode->state;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return state;
M
Minghao Li 已提交
158 159
}

M
Minghao Li 已提交
160 161 162 163 164
const char* syncGetMyRoleStr(int64_t rid) {
  const char* s = syncUtilState2String(syncGetMyRole(rid));
  return s;
}

M
Minghao Li 已提交
165 166 167 168 169 170 171 172 173 174 175 176
int32_t syncGetVgId(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
  int32_t vgId = pSyncNode->vgId;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return vgId;
}

M
Minghao Li 已提交
177
SyncTerm syncGetMyTerm(int64_t rid) {
M
Minghao Li 已提交
178 179
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
180 181 182 183 184 185 186 187 188
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
  SyncTerm term = pSyncNode->pRaftStore->currentTerm;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return term;
}

M
Minghao Li 已提交
189 190 191 192 193 194 195 196 197
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
  assert(rid == pSyncNode->rid);
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
198 199
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
M
Minghao Li 已提交
200
    (pEpSet->numOfEps)++;
M
Minghao Li 已提交
201 202

    sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
M
Minghao Li 已提交
203 204
  }
  pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
M
Minghao Li 已提交
205

M
Minghao Li 已提交
206
  sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
M
Minghao Li 已提交
207 208 209 210

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);

  SRespStub stub;
  int32_t   ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
    memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);

  SRespStub stub;
  int32_t   ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
    memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

245
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
M
Minghao Li 已提交
246 247 248 249 250 251
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
    return;
  }
  assert(rid == pSyncNode->rid);
S
Shengliang Guan 已提交
252
  pSyncNode->msgcb = msgcb;
M
Minghao Li 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

char* sync2SimpleStr(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid);
    return NULL;
  }
  assert(rid == pSyncNode->rid);
  char* s = syncNode2SimpleStr(pSyncNode);
  taosReleaseRef(tsNodeRefId, pSyncNode->rid);

  return s;
}

void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->pingBaseLine = pingTimerMS;
  pSyncNode->pingTimerMS = pingTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->electBaseLine = electTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->hbBaseLine = hbTimerMS;
  pSyncNode->heartbeatTimerMS = hbTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
306 307
  sTrace("syncPropose msgType:%d ", pMsg->msgType);

M
Minghao Li 已提交
308 309 310 311
  int32_t    ret = TAOS_SYNC_PROPOSE_SUCCESS;
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
M
Minghao Li 已提交
312 313 314
  }
  assert(rid == pSyncNode->rid);

M
Minghao Li 已提交
315
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
316 317 318 319 320 321
    SRespStub stub;
    stub.createTime = taosGetTimestampMs();
    stub.rpcMsg = *pMsg;
    uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);

    SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
M
Minghao Li 已提交
322 323
    SRpcMsg            rpcMsg;
    syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
324
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
325
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
326 327 328
    } else {
      sTrace("syncPropose pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
329
    syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
330
    ret = TAOS_SYNC_PROPOSE_SUCCESS;
M
Minghao Li 已提交
331

M
Minghao Li 已提交
332
  } else {
M
Minghao Li 已提交
333
    sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
334
    ret = TAOS_SYNC_PROPOSE_NOT_LEADER;
M
Minghao Li 已提交
335
  }
M
Minghao Li 已提交
336 337 338 339 340

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

M
Minghao Li 已提交
341
// open/close --------------
M
Minghao Li 已提交
342
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
wafwerar's avatar
wafwerar 已提交
343
  SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
M
Minghao Li 已提交
344
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
345
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
346

M
Minghao Li 已提交
347 348 349 350 351 352 353 354 355 356 357 358
  int32_t ret = 0;
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
      return NULL;
    }

    // create raft config file
    snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
    ret = syncCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncNode->configPath);
    assert(ret == 0);
M
Minghao Li 已提交
359 360
  }

M
Minghao Li 已提交
361
  // init by SSyncInfo
M
Minghao Li 已提交
362 363
  pSyncNode->vgId = pSyncInfo->vgId;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
M
Minghao Li 已提交
364
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path);
M
Minghao Li 已提交
365 366
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);

M
Minghao Li 已提交
367
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
368
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
369
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
370
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
371

M
Minghao Li 已提交
372 373 374 375
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
  assert(pSyncNode->pRaftCfg != NULL);

M
Minghao Li 已提交
376
  // init internal
M
Minghao Li 已提交
377 378
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
379

M
Minghao Li 已提交
380
  // init peersNum, peers, peersId
M
Minghao Li 已提交
381
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
M
Minghao Li 已提交
382
  int j = 0;
M
Minghao Li 已提交
383 384 385
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
386 387 388
      j++;
    }
  }
M
Minghao Li 已提交
389
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
390
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
391
  }
M
Minghao Li 已提交
392

M
Minghao Li 已提交
393
  // init replicaNum, replicasId
M
Minghao Li 已提交
394 395 396
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
397 398
  }

M
Minghao Li 已提交
399
  // init raft algorithm
M
Minghao Li 已提交
400
  pSyncNode->pFsm = pSyncInfo->pFsm;
M
Minghao Li 已提交
401
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
402 403
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
404
  // init life cycle outside
M
Minghao Li 已提交
405

M
Minghao Li 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
430
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
431
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
432
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
M
Minghao Li 已提交
433 434
  assert(pSyncNode->pRaftStore != NULL);

M
Minghao Li 已提交
435
  // init TLA+ candidate vars
M
Minghao Li 已提交
436 437 438 439 440
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
  assert(pSyncNode->pVotesGranted != NULL);
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
  assert(pSyncNode->pVotesRespond != NULL);

M
Minghao Li 已提交
441 442 443 444 445 446 447 448 449
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pNextIndex != NULL);
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pMatchIndex != NULL);

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
  assert(pSyncNode->pLogStore != NULL);
M
Minghao Li 已提交
450
  pSyncNode->commitIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
451

M
Minghao Li 已提交
452 453 454 455 456
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
457
  // init ping timer
M
Minghao Li 已提交
458
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
459
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
460 461
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
462
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
463
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
464

M
Minghao Li 已提交
465 466
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
467
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
468 469
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
M
Minghao Li 已提交
470
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
471 472 473 474
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
475
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
476 477
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
478
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
479 480
  pSyncNode->heartbeatTimerCounter = 0;

M
Minghao Li 已提交
481
  // init callback
M
Minghao Li 已提交
482 483
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
M
Minghao Li 已提交
484
  pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb;
M
Minghao Li 已提交
485 486 487 488
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
489
  pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
M
Minghao Li 已提交
490

M
Minghao Li 已提交
491 492 493 494
  // tools
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
  assert(pSyncNode->pSyncRespMgr != NULL);

495 496 497 498 499 500 501
  // restore state
  pSyncNode->restoreFinish = false;
  pSyncNode->pSnapshot = NULL;
  if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
    pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
    pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
  }
502
  //tsem_init(&(pSyncNode->restoreSem), 0, 0);
503

M
Minghao Li 已提交
504
  // start in syncNodeStart
M
Minghao Li 已提交
505
  // start raft
M
Minghao Li 已提交
506
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
507

M
Minghao Li 已提交
508 509 510
  return pSyncNode;
}

M
Minghao Li 已提交
511 512
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
513 514
  if (pSyncNode->replicaNum == 1) {
    syncNodeBecomeLeader(pSyncNode);
M
format  
Minghao Li 已提交
515

516
    syncNodeLog2("==state change become leader immediately==", pSyncNode);
M
format  
Minghao Li 已提交
517

518
    // Raft 3.6.2 Committing entries from previous terms
M
format  
Minghao Li 已提交
519

520 521 522
    // use this now
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);  // maybe only one replica
523

524 525
    /*
    sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode);
526
    tsem_wait(&pSyncNode->restoreSem);
527 528 529 530 531 532 533 534 535 536
    sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode);
    */

    /*
    while (pSyncNode->restoreFinish != true) {
      taosMsleep(10);
    }
    */

    sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
537 538 539
    return;
  }

M
Minghao Li 已提交
540 541 542 543 544 545
  syncNodeBecomeFollower(pSyncNode);

  // for test
  int32_t ret = 0;
  // ret = syncNodeStartPingTimer(pSyncNode);
  assert(ret == 0);
546

547 548
  /*
  sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode);
549
  tsem_wait(&pSyncNode->restoreSem);
550 551 552 553 554 555 556 557 558
  sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode);
  */

  /*
  while (pSyncNode->restoreFinish != true) {
    taosMsleep(10);
  }
  */
  sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
M
Minghao Li 已提交
559 560
}

M
Minghao Li 已提交
561 562 563 564 565 566 567 568 569 570 571
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
}

M
Minghao Li 已提交
572
void syncNodeClose(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
573
  int32_t ret;
M
Minghao Li 已提交
574
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
575 576 577 578

  ret = raftStoreClose(pSyncNode->pRaftStore);
  assert(ret == 0);

M
Minghao Li 已提交
579
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
580 581 582 583 584
  voteGrantedDestroy(pSyncNode->pVotesGranted);
  votesRespondDestory(pSyncNode->pVotesRespond);
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
  logStoreDestory(pSyncNode->pLogStore);
M
Minghao Li 已提交
585
  raftCfgClose(pSyncNode->pRaftCfg);
M
Minghao Li 已提交
586 587 588 589 590

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
591 592 593 594
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

595 596 597 598
  if (pSyncNode->pSnapshot != NULL) {
    taosMemoryFree(pSyncNode->pSnapshot);
  }

599
  //tsem_destroy(&pSyncNode->restoreSem);
600

M
Minghao Li 已提交
601 602
  // free memory in syncFreeNode
  // taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
603 604
}

M
Minghao Li 已提交
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
  syncPingLog2((char*)"==syncNodePing==", pMsg);
  int32_t ret = 0;

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);

  ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
}

int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
M
Minghao Li 已提交
620
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId, pSyncNode->vgId);
M
Minghao Li 已提交
621 622 623 624 625 626 627 628 629 630
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
  assert(ret == 0);

  syncPingDestroy(pMsg);
  return ret;
}

int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
631 632 633
    SRaftId*  destId = &(pSyncNode->peersId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
634 635 636 637 638 639 640 641
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

int32_t syncNodePingAll(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
642 643 644 645
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    SRaftId*  destId = &(pSyncNode->replicasId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pPingTimer);
  atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  pSyncNode->electTimerMS = ms;
  taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pElectTimer);
  atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
693 694
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
695
  int32_t electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
696 697 698 699
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  return ret;
}

M
Minghao Li 已提交
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pHeartbeatTimer);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
  return ret;
}

// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
M
Minghao Li 已提交
720 721 722 723
  if (pSyncNode->FpSendMsg != NULL) {
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

724
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
725
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
726 727 728
  } else {
    sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
  }
M
Minghao Li 已提交
729 730 731 732 733 734
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
M
Minghao Li 已提交
735 736 737 738
  if (pSyncNode->FpSendMsg != NULL) {
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

739
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
740
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
741 742 743
  } else {
    sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
  }
M
Minghao Li 已提交
744 745 746
  return 0;
}

M
Minghao Li 已提交
747 748 749 750
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
  char   u64buf[128];
  cJSON* pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
751 752 753
  if (pSyncNode != NULL) {
    // init by SSyncInfo
    cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
M
Minghao Li 已提交
754
    cJSON_AddItemToObject(pRoot, "SRaftCfg", raftCfg2Json(pSyncNode->pRaftCfg));
M
Minghao Li 已提交
755
    cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
M
Minghao Li 已提交
756 757 758
    cJSON_AddStringToObject(pRoot, "raftStorePath", pSyncNode->raftStorePath);
    cJSON_AddStringToObject(pRoot, "configPath", pSyncNode->configPath);

M
Minghao Li 已提交
759 760 761
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);

S
Shengliang Guan 已提交
762
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
763 764 765 766
    cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
    cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);

S
Shengliang Guan 已提交
767
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788
    cJSON_AddStringToObject(pRoot, "queue", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
    cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);

    // init internal
    cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
    cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
    cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
    cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);

    cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
    cJSON* pPeers = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
    }
    cJSON* pPeersId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
    }
M
Minghao Li 已提交
789

M
Minghao Li 已提交
790 791 792 793 794 795
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
    cJSON* pReplicasId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
    }
M
Minghao Li 已提交
796

M
Minghao Li 已提交
797 798 799 800 801 802 803
    // raft algorithm
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
    cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
    cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
    cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);

M
Minghao Li 已提交
804 805 806 807
    // life cycle
    snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->rid);
    cJSON_AddStringToObject(pRoot, "rid", u64buf);

M
Minghao Li 已提交
808 809 810
    // tla+ server vars
    cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
    cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
811
    cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore));
M
Minghao Li 已提交
812 813 814 815 816 817 818 819 820 821 822

    // tla+ candidate vars
    cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
    cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));

    // tla+ leader vars
    cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
    cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));

    // tla+ log vars
    cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
M
Minghao Li 已提交
823
    snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex);
M
Minghao Li 已提交
824 825
    cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);

M
Minghao Li 已提交
826 827 828 829 830
    // timer ms init
    cJSON_AddNumberToObject(pRoot, "pingBaseLine", pSyncNode->pingBaseLine);
    cJSON_AddNumberToObject(pRoot, "electBaseLine", pSyncNode->electBaseLine);
    cJSON_AddNumberToObject(pRoot, "hbBaseLine", pSyncNode->hbBaseLine);

M
Minghao Li 已提交
831 832 833 834
    // ping timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
    cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
M
Minghao Li 已提交
835
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock);
M
Minghao Li 已提交
836
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
M
Minghao Li 已提交
837
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
838 839 840
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
    cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
M
Minghao Li 已提交
841
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter);
M
Minghao Li 已提交
842 843 844 845 846 847
    cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);

    // elect timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
    cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
M
Minghao Li 已提交
848
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock);
M
Minghao Li 已提交
849
    cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
M
Minghao Li 已提交
850
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
851 852 853
    cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
    cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
M
Minghao Li 已提交
854
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter);
M
Minghao Li 已提交
855 856 857 858 859 860
    cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);

    // heartbeat timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
    cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
861
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock);
M
Minghao Li 已提交
862
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
M
Minghao Li 已提交
863
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser);
M
Minghao Li 已提交
864 865 866
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
    cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
M
Minghao Li 已提交
867
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter);
M
Minghao Li 已提交
868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884
    cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);

    // callback
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
    cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
    cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
    cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
M
Minghao Li 已提交
885 886 887 888 889 890 891
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
  return pJson;
}

M
Minghao Li 已提交
892 893 894 895 896 897 898
char* syncNode2Str(const SSyncNode* pSyncNode) {
  cJSON* pJson = syncNode2Json(pSyncNode);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
899 900 901 902 903 904 905 906 907 908 909 910 911
char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
  int   len = 256;
  char* s = (char*)taosMemoryMalloc(len);
  snprintf(s, len,
           "syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, electTimerLogicClock:%lu, "
           "electTimerLogicClockUser:%lu, "
           "electTimerMS:%d",
           pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->state,
           syncUtilState2String(pSyncNode->state), pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser,
           pSyncNode->electTimerMS);
  return s;
}

M
Minghao Li 已提交
912
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
M
Minghao Li 已提交
913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938
  pSyncNode->pRaftCfg->cfg = *newConfig;
  int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
  ASSERT(ret == 0);

  // init internal
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);

  // init peersNum, peers, peersId
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
  int j = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
      j++;
    }
  }
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
  }

  // init replicaNum, replicasId
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
  }
M
Minghao Li 已提交
939 940 941 942 943

  syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
  syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);

  syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
M
Minghao Li 已提交
944 945
}

M
Minghao Li 已提交
946 947 948 949 950 951 952 953 954 955 956
SSyncNode* syncNodeAcquire(int64_t rid) {
  SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid);
  if (pNode == NULL) {
    sTrace("failed to acquire node from refId:%" PRId64, rid);
  }

  return pNode;
}

void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid); }

M
Minghao Li 已提交
957 958 959 960 961 962 963 964 965 966
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
    syncNodeBecomeFollower(pSyncNode);
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
967
  // maybe clear leader cache
M
Minghao Li 已提交
968 969 970 971
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
972
  // state change
M
Minghao Li 已提交
973 974 975
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
976 977
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
999
  // state change
M
Minghao Li 已提交
1000
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1001 1002

  // set leader cache
M
Minghao Li 已提交
1003 1004 1005
  pSyncNode->leaderCache = pSyncNode->myRaftId;

  for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1006 1007
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1008 1009 1010 1011
    pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
  }

  for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1012 1013
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1014 1015 1016
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1017
  // stop elect timer
M
Minghao Li 已提交
1018
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1019 1020

  // start replicate right now!
M
Minghao Li 已提交
1021
  syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
1022 1023 1024

  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1025 1026 1027 1028 1029 1030
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  assert(voteGrantedMajority(pSyncNode->pVotesGranted));
  syncNodeBecomeLeader(pSyncNode);
M
Minghao Li 已提交
1031

M
Minghao Li 已提交
1032 1033
  syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);

M
Minghao Li 已提交
1034
  // Raft 3.6.2 Committing entries from previous terms
M
Minghao Li 已提交
1035 1036

  // use this now
M
Minghao Li 已提交
1037
  syncNodeAppendNoop(pSyncNode);
1038
  syncMaybeAdvanceCommitIndex(pSyncNode);  // maybe only one replica
M
Minghao Li 已提交
1039 1040

  // do not use this
M
Minghao Li 已提交
1041
  // syncNodeEqNoop(pSyncNode);
M
Minghao Li 已提交
1042 1043 1044 1045 1046
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
M
Minghao Li 已提交
1047 1048

  syncNodeLog2("==state change syncNodeFollower2Candidate==", pSyncNode);
M
Minghao Li 已提交
1049 1050 1051 1052 1053
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
  syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1054 1055

  syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode);
M
Minghao Li 已提交
1056 1057 1058 1059 1060
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1061 1062

  syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode);
M
Minghao Li 已提交
1063 1064 1065
}

// raft vote --------------
M
Minghao Li 已提交
1066 1067 1068

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1069 1070 1071 1072 1073 1074 1075
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
  assert(term == pSyncNode->pRaftStore->currentTerm);
  assert(!raftStoreHasVoted(pSyncNode->pRaftStore));

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1076
// simulate get vote from outside
M
Minghao Li 已提交
1077 1078 1079
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

M
Minghao Li 已提交
1080
  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId);
M
Minghao Li 已提交
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}

M
Minghao Li 已提交
1091 1092 1093 1094 1095
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
1096
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1097 1098 1099 1100 1101 1102
}

void syncNodePrint2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
1103
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1104 1105 1106 1107
}

void syncNodeLog(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
M
Minghao Li 已提交
1108
  sTraceLong("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
1109
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1110 1111 1112 1113
}

void syncNodeLog2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
M
Minghao Li 已提交
1114
  sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
1115
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1116 1117
}

M
Minghao Li 已提交
1118
// ------ local funciton ---------
M
Minghao Li 已提交
1119
// enqueue message ----
M
Minghao Li 已提交
1120 1121
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
1122
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
1123
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
M
Minghao Li 已提交
1124
                                              pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1125 1126
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1127
    syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
M
Minghao Li 已提交
1128
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1129
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1130 1131 1132
    } else {
      sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1133 1134
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1135
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
1136 1137
                 &pSyncNode->pPingTimer);
  } else {
1138
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
1139
           pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1140 1141 1142 1143 1144 1145 1146
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
M
Minghao Li 已提交
1147
                                              pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1148
    SRpcMsg      rpcMsg;
M
Minghao Li 已提交
1149
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1150
    syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
M
Minghao Li 已提交
1151
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1152
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1153 1154 1155
    } else {
      sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1156 1157
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1158
    // reset timer ms
M
Minghao Li 已提交
1159
    pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1160 1161
    taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pElectTimer);
M
Minghao Li 已提交
1162
  } else {
1163
    sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
1164
           pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
1165 1166 1167
  }
}

M
Minghao Li 已提交
1168 1169 1170 1171 1172 1173
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
      atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
    SyncTimeout* pSyncMsg =
        syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
M
Minghao Li 已提交
1174
                          pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1175 1176
    SRpcMsg rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1177
    syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
M
Minghao Li 已提交
1178
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1179
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1180 1181 1182
    } else {
      sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1183 1184
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1185
    taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
1186 1187
                 &pSyncNode->pHeartbeatTimer);
  } else {
M
Minghao Li 已提交
1188 1189
    sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
           "",
M
Minghao Li 已提交
1190 1191 1192 1193
           pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  }
}

M
Minghao Li 已提交
1194 1195 1196 1197 1198 1199
static int32_t syncNodeEqNoop(SSyncNode* ths) {
  int32_t ret = 0;
  assert(ths->state == TAOS_SYNC_STATE_LEADER);

  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1200
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1201 1202 1203 1204 1205 1206 1207 1208
  assert(pEntry != NULL);

  uint32_t           entryLen;
  char*              serialized = syncEntrySerialize(pEntry, &entryLen);
  SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen);
  assert(pSyncMsg->dataLen == entryLen);
  memcpy(pSyncMsg->data, serialized, entryLen);

S
Shengliang Guan 已提交
1209
  SRpcMsg rpcMsg = {0};
M
Minghao Li 已提交
1210
  syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1211
  if (ths->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1212
    ths->FpEqMsg(ths->msgcb, &rpcMsg);
M
Minghao Li 已提交
1213 1214 1215
  } else {
    sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
  }
M
Minghao Li 已提交
1216

wafwerar's avatar
wafwerar 已提交
1217
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1218 1219 1220 1221 1222 1223 1224 1225 1226 1227
  syncClientRequestDestroy(pSyncMsg);

  return ret;
}

static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1228
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1229 1230 1231 1232 1233 1234 1235
  assert(pEntry != NULL);

  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
    syncNodeReplicate(ths);
  }

M
Minghao Li 已提交
1236
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1237 1238 1239
  return ret;
}

M
Minghao Li 已提交
1240
// on message ----
M
Minghao Li 已提交
1241 1242 1243 1244 1245 1246 1247 1248 1249
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
  // log state
  char logBuf[1024];
  snprintf(logBuf, sizeof(logBuf),
           "==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%lu electTimerLogicClock:%lu, "
           "electTimerLogicClockUser:%lu, electTimerMS:%d",
           ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm,
           ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS);

M
Minghao Li 已提交
1250
  int32_t ret = 0;
M
Minghao Li 已提交
1251 1252
  syncPingLog2(logBuf, pMsg);
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
M
Minghao Li 已提交
1253 1254
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
M
Minghao Li 已提交
1255 1256 1257 1258 1259 1260 1261 1262

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

M
Minghao Li 已提交
1263 1264 1265 1266 1267
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

  return ret;
}

M
Minghao Li 已提交
1268
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
1269 1270 1271 1272
  int32_t ret = 0;
  syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg);
  return ret;
}
M
Minghao Li 已提交
1273

M
Minghao Li 已提交
1274 1275 1276 1277 1278 1279 1280 1281 1282 1283
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
1284
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
M
Minghao Li 已提交
1285 1286 1287
  int32_t ret = 0;
  syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);

M
Minghao Li 已提交
1288 1289 1290 1291 1292
  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
  assert(pEntry != NULL);

M
Minghao Li 已提交
1293 1294
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
M
Minghao Li 已提交
1295 1296

    // start replicate right now!
M
Minghao Li 已提交
1297
    syncNodeReplicate(ths);
M
Minghao Li 已提交
1298

M
Minghao Li 已提交
1299 1300 1301 1302
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
1303
    if (ths->pFsm != NULL) {
M
Minghao Li 已提交
1304
      // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
1305
      if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
1306 1307 1308 1309 1310 1311 1312
        SFsmCbMeta cbMeta;
        cbMeta.index = pEntry->index;
        cbMeta.isWeak = pEntry->isWeak;
        cbMeta.code = 0;
        cbMeta.state = ths->state;
        cbMeta.seqNum = pEntry->seqNum;
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
1313
      }
M
Minghao Li 已提交
1314 1315 1316
    }
    rpcFreeCont(rpcMsg.pCont);

M
Minghao Li 已提交
1317 1318 1319
    // only myself, maybe commit
    syncMaybeAdvanceCommitIndex(ths);

M
Minghao Li 已提交
1320
  } else {
M
Minghao Li 已提交
1321 1322 1323 1324
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
1325
    if (ths->pFsm != NULL) {
M
Minghao Li 已提交
1326
      // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
1327
      if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
1328 1329 1330 1331 1332 1333 1334
        SFsmCbMeta cbMeta;
        cbMeta.index = pEntry->index;
        cbMeta.isWeak = pEntry->isWeak;
        cbMeta.code = 1;
        cbMeta.state = ths->state;
        cbMeta.seqNum = pEntry->seqNum;
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
1335
      }
M
Minghao Li 已提交
1336 1337
    }
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1338 1339
  }

M
Minghao Li 已提交
1340
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1341
  return ret;
1342
}
M
Minghao Li 已提交
1343 1344 1345

static void syncFreeNode(void* param) {
  SSyncNode* pNode = param;
M
Minghao Li 已提交
1346 1347
  // inner object already free
  // syncNodePrint2((char*)"==syncFreeNode==", pNode);
M
Minghao Li 已提交
1348

wafwerar's avatar
wafwerar 已提交
1349
  taosMemoryFree(pNode);
M
Minghao Li 已提交
1350
}
S
Shengliang Guan 已提交
1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362

const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
      return "FOLLOWER";
    case TAOS_SYNC_STATE_CANDIDATE:
      return "CANDIDATE";
    case TAOS_SYNC_STATE_LEADER:
      return "LEADER";
    default:
      return "ERROR";
  }
M
Minghao Li 已提交
1363
}