You need to sign in or sign up before continuing.
syncMain.c 47.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

M
Minghao Li 已提交
16
#include "sync.h"
M
Minghao Li 已提交
17 18
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
19
#include "syncCommit.h"
M
Minghao Li 已提交
20
#include "syncElection.h"
M
Minghao Li 已提交
21
#include "syncEnv.h"
M
Minghao Li 已提交
22
#include "syncIndexMgr.h"
M
Minghao Li 已提交
23
#include "syncInt.h"
M
Minghao Li 已提交
24
#include "syncMessage.h"
M
Minghao Li 已提交
25
#include "syncRaftCfg.h"
M
Minghao Li 已提交
26
#include "syncRaftLog.h"
M
Minghao Li 已提交
27
#include "syncRaftStore.h"
M
Minghao Li 已提交
28
#include "syncReplication.h"
M
Minghao Li 已提交
29 30
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
31
#include "syncRespMgr.h"
M
Minghao Li 已提交
32
#include "syncTimeout.h"
M
Minghao Li 已提交
33
#include "syncUtil.h"
M
Minghao Li 已提交
34
#include "syncVoteMgr.h"
M
Minghao Li 已提交
35
#include "tref.h"
M
Minghao Li 已提交
36

M
Minghao Li 已提交
37 38 39
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
40
// enqueue message ----
M
Minghao Li 已提交
41 42 43 44 45
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
M
Minghao Li 已提交
46

M
Minghao Li 已提交
47
// process message ----
M
Minghao Li 已提交
48 49 50
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
M
Minghao Li 已提交
51 52 53

// life cycle
static void syncFreeNode(void* param);
M
Minghao Li 已提交
54 55 56
// ---------------------------------

int32_t syncInit() {
M
Minghao Li 已提交
57 58 59 60 61 62 63 64 65 66 67
  int32_t ret = 0;

  if (!syncEnvIsStart()) {
    tsNodeRefId = taosOpenRef(200, syncFreeNode);
    if (tsNodeRefId < 0) {
      sError("failed to init node ref");
      syncCleanUp();
      ret = -1;
    } else {
      ret = syncEnvStart();
    }
M
Minghao Li 已提交
68 69
  }

M
Minghao Li 已提交
70
  return ret;
M
Minghao Li 已提交
71
}
M
Minghao Li 已提交
72

M
Minghao Li 已提交
73 74 75
void syncCleanUp() {
  int32_t ret = syncEnvStop();
  assert(ret == 0);
M
Minghao Li 已提交
76 77 78 79 80

  if (tsNodeRefId != -1) {
    taosCloseRef(tsNodeRefId);
    tsNodeRefId = -1;
  }
M
Minghao Li 已提交
81
}
M
Minghao Li 已提交
82

M
Minghao Li 已提交
83
int64_t syncOpen(const SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
84 85
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87 88
  syncNodeLog2("syncNodeOpen open success", pSyncNode);

M
Minghao Li 已提交
89 90 91 92 93 94 95
  pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
  if (pSyncNode->rid < 0) {
    syncFreeNode(pSyncNode);
    return -1;
  }

  return pSyncNode->rid;
M
Minghao Li 已提交
96
}
M
Minghao Li 已提交
97

M
Minghao Li 已提交
98 99 100 101 102 103 104 105 106 107
void syncStart(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  syncNodeStart(pSyncNode);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
108 109 110 111 112 113 114 115 116 117
void syncStartStandBy(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  syncNodeStartStandBy(pSyncNode);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
118
void syncStop(int64_t rid) {
M
Minghao Li 已提交
119 120 121 122
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
123
  syncNodeClose(pSyncNode);
M
Minghao Li 已提交
124 125 126

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  taosRemoveRef(tsNodeRefId, rid);
M
Minghao Li 已提交
127
}
M
Minghao Li 已提交
128

M
Minghao Li 已提交
129 130
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
  int32_t ret = 0;
M
Minghao Li 已提交
131
  char*   configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
M
Minghao Li 已提交
132 133
  SRpcMsg rpcMsg = {0};
  rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
S
Shengliang Guan 已提交
134
  rpcMsg.info.noResp = 1;
M
Minghao Li 已提交
135 136 137 138 139
  rpcMsg.contLen = strlen(configChange) + 1;
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange);
  taosMemoryFree(configChange);
  ret = syncPropose(rid, &rpcMsg, false);
M
Minghao Li 已提交
140 141
  return ret;
}
M
Minghao Li 已提交
142

M
Minghao Li 已提交
143 144 145 146
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
  int32_t ret = syncPropose(rid, pMsg, isWeak);
  return ret;
}
M
Minghao Li 已提交
147

M
Minghao Li 已提交
148 149 150 151 152 153
ESyncState syncGetMyRole(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
M
Minghao Li 已提交
154 155 156 157
  ESyncState state = pSyncNode->state;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return state;
M
Minghao Li 已提交
158 159
}

M
Minghao Li 已提交
160 161 162 163 164 165 166 167 168 169 170 171
bool syncIsRestoreFinish(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return false;
  }
  assert(rid == pSyncNode->rid);
  bool b = pSyncNode->restoreFinish;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return b;
}

M
Minghao Li 已提交
172 173 174 175 176
const char* syncGetMyRoleStr(int64_t rid) {
  const char* s = syncUtilState2String(syncGetMyRole(rid));
  return s;
}

M
Minghao Li 已提交
177 178 179 180 181 182 183 184 185 186 187 188
int32_t syncGetVgId(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
  int32_t vgId = pSyncNode->vgId;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return vgId;
}

M
Minghao Li 已提交
189
SyncTerm syncGetMyTerm(int64_t rid) {
M
Minghao Li 已提交
190 191
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
192 193 194 195 196 197 198 199 200
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
  SyncTerm term = pSyncNode->pRaftStore->currentTerm;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return term;
}

M
Minghao Li 已提交
201 202 203 204 205 206 207 208 209
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
  assert(rid == pSyncNode->rid);
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
210 211
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
M
Minghao Li 已提交
212
    (pEpSet->numOfEps)++;
M
Minghao Li 已提交
213 214

    sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
M
Minghao Li 已提交
215 216
  }
  pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
M
Minghao Li 已提交
217

M
Minghao Li 已提交
218
  sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
M
Minghao Li 已提交
219 220 221 222

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);

  SRespStub stub;
  int32_t   ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
    memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);

  SRespStub stub;
  int32_t   ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
    memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

257
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
M
Minghao Li 已提交
258 259 260 261 262 263
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
    return;
  }
  assert(rid == pSyncNode->rid);
S
Shengliang Guan 已提交
264
  pSyncNode->msgcb = msgcb;
M
Minghao Li 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

char* sync2SimpleStr(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid);
    return NULL;
  }
  assert(rid == pSyncNode->rid);
  char* s = syncNode2SimpleStr(pSyncNode);
  taosReleaseRef(tsNodeRefId, pSyncNode->rid);

  return s;
}

void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->pingBaseLine = pingTimerMS;
  pSyncNode->pingTimerMS = pingTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->electBaseLine = electTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->hbBaseLine = hbTimerMS;
  pSyncNode->heartbeatTimerMS = hbTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
318 319
  sTrace("syncPropose msgType:%d ", pMsg->msgType);

M
Minghao Li 已提交
320
  int32_t    ret = TAOS_SYNC_PROPOSE_SUCCESS;
321 322
  SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) return TAOS_SYNC_PROPOSE_OTHER_ERROR;
S
Shengliang Guan 已提交
323

M
Minghao Li 已提交
324 325
  assert(rid == pSyncNode->rid);

M
Minghao Li 已提交
326
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
327 328 329 330 331 332
    SRespStub stub;
    stub.createTime = taosGetTimestampMs();
    stub.rpcMsg = *pMsg;
    uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);

    SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
M
Minghao Li 已提交
333 334
    SRpcMsg            rpcMsg;
    syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
335 336 337

    if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
      ret = TAOS_SYNC_PROPOSE_SUCCESS;
M
Minghao Li 已提交
338 339 340
    } else {
      sTrace("syncPropose pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
341 342
    syncClientRequestDestroy(pSyncMsg);
  } else {
M
Minghao Li 已提交
343
    sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
344
    ret = TAOS_SYNC_PROPOSE_NOT_LEADER;
M
Minghao Li 已提交
345
  }
M
Minghao Li 已提交
346 347 348 349 350

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

M
Minghao Li 已提交
351
// open/close --------------
352 353 354
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
  SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;

wafwerar's avatar
wafwerar 已提交
355
  SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
M
Minghao Li 已提交
356
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
357
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
358

M
Minghao Li 已提交
359 360 361 362 363 364 365
  int32_t ret = 0;
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
      return NULL;
    }
366
  }
M
Minghao Li 已提交
367

368 369
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
370 371 372
    // create raft config file
    ret = syncCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncNode->configPath);
    assert(ret == 0);
373 374 375 376 377 378 379

  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
    assert(pSyncNode->pRaftCfg != NULL);
    pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;

M
Minghao Li 已提交
380
    char* seralized = raftCfg2Str(pSyncNode->pRaftCfg);
381 382 383 384
    sInfo("syncNodeOpen update config :%s", seralized);
    taosMemoryFree(seralized);

    raftCfgClose(pSyncNode->pRaftCfg);
M
Minghao Li 已提交
385 386
  }

M
Minghao Li 已提交
387
  // init by SSyncInfo
M
Minghao Li 已提交
388 389
  pSyncNode->vgId = pSyncInfo->vgId;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
M
Minghao Li 已提交
390
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path);
M
Minghao Li 已提交
391 392
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);

M
Minghao Li 已提交
393
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
394
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
395
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
396
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
397

M
Minghao Li 已提交
398 399 400 401
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
  assert(pSyncNode->pRaftCfg != NULL);

M
Minghao Li 已提交
402
  // init internal
M
Minghao Li 已提交
403 404
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
405

M
Minghao Li 已提交
406
  // init peersNum, peers, peersId
M
Minghao Li 已提交
407
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
M
Minghao Li 已提交
408
  int j = 0;
M
Minghao Li 已提交
409 410 411
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
412 413 414
      j++;
    }
  }
M
Minghao Li 已提交
415
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
416
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
417
  }
M
Minghao Li 已提交
418

M
Minghao Li 已提交
419
  // init replicaNum, replicasId
M
Minghao Li 已提交
420 421 422
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
423 424
  }

M
Minghao Li 已提交
425
  // init raft algorithm
M
Minghao Li 已提交
426
  pSyncNode->pFsm = pSyncInfo->pFsm;
M
Minghao Li 已提交
427
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
428 429
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
430
  // init life cycle outside
M
Minghao Li 已提交
431

M
Minghao Li 已提交
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
456
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
457
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
458
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
M
Minghao Li 已提交
459 460
  assert(pSyncNode->pRaftStore != NULL);

M
Minghao Li 已提交
461
  // init TLA+ candidate vars
M
Minghao Li 已提交
462 463 464 465 466
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
  assert(pSyncNode->pVotesGranted != NULL);
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
  assert(pSyncNode->pVotesRespond != NULL);

M
Minghao Li 已提交
467 468 469 470 471 472 473 474 475
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pNextIndex != NULL);
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pMatchIndex != NULL);

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
  assert(pSyncNode->pLogStore != NULL);
M
Minghao Li 已提交
476
  pSyncNode->commitIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
477

M
Minghao Li 已提交
478 479 480 481 482
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
483
  // init ping timer
M
Minghao Li 已提交
484
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
485
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
486 487
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
488
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
489
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
490

M
Minghao Li 已提交
491 492
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
493
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
494 495
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
M
Minghao Li 已提交
496
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
497 498 499 500
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
501
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
502 503
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
504
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
505 506
  pSyncNode->heartbeatTimerCounter = 0;

M
Minghao Li 已提交
507
  // init callback
M
Minghao Li 已提交
508 509
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
M
Minghao Li 已提交
510
  pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb;
M
Minghao Li 已提交
511 512 513 514
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
515
  pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
M
Minghao Li 已提交
516

M
Minghao Li 已提交
517 518 519 520
  // tools
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
  assert(pSyncNode->pSyncRespMgr != NULL);

521 522 523 524 525 526 527
  // restore state
  pSyncNode->restoreFinish = false;
  pSyncNode->pSnapshot = NULL;
  if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
    pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
    pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
  }
528
  // tsem_init(&(pSyncNode->restoreSem), 0, 0);
529

M
Minghao Li 已提交
530
  // start in syncNodeStart
M
Minghao Li 已提交
531
  // start raft
M
Minghao Li 已提交
532
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
533

M
Minghao Li 已提交
534 535 536
  return pSyncNode;
}

M
Minghao Li 已提交
537 538
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
539 540
  if (pSyncNode->replicaNum == 1) {
    syncNodeBecomeLeader(pSyncNode);
M
format  
Minghao Li 已提交
541

542
    syncNodeLog2("==state change become leader immediately==", pSyncNode);
M
format  
Minghao Li 已提交
543

544
    // Raft 3.6.2 Committing entries from previous terms
M
format  
Minghao Li 已提交
545

546 547 548
    // use this now
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);  // maybe only one replica
549

550 551
    /*
    sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode);
552
    tsem_wait(&pSyncNode->restoreSem);
553 554 555 556 557 558 559 560 561 562
    sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode);
    */

    /*
    while (pSyncNode->restoreFinish != true) {
      taosMsleep(10);
    }
    */

    sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
563 564 565
    return;
  }

M
Minghao Li 已提交
566 567 568 569 570 571
  syncNodeBecomeFollower(pSyncNode);

  // for test
  int32_t ret = 0;
  // ret = syncNodeStartPingTimer(pSyncNode);
  assert(ret == 0);
572

573 574
  /*
  sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode);
575
  tsem_wait(&pSyncNode->restoreSem);
576 577 578 579 580 581 582 583 584
  sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode);
  */

  /*
  while (pSyncNode->restoreFinish != true) {
    taosMsleep(10);
  }
  */
  sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
M
Minghao Li 已提交
585 586
}

M
Minghao Li 已提交
587 588 589 590 591 592 593 594 595 596 597
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
}

M
Minghao Li 已提交
598
void syncNodeClose(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
599
  int32_t ret;
M
Minghao Li 已提交
600
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
601 602 603 604

  ret = raftStoreClose(pSyncNode->pRaftStore);
  assert(ret == 0);

M
Minghao Li 已提交
605
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
606 607 608 609 610
  voteGrantedDestroy(pSyncNode->pVotesGranted);
  votesRespondDestory(pSyncNode->pVotesRespond);
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
  logStoreDestory(pSyncNode->pLogStore);
M
Minghao Li 已提交
611
  raftCfgClose(pSyncNode->pRaftCfg);
M
Minghao Li 已提交
612 613 614 615 616

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
617 618 619 620
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

621 622 623 624
  if (pSyncNode->pSnapshot != NULL) {
    taosMemoryFree(pSyncNode->pSnapshot);
  }

625
  // tsem_destroy(&pSyncNode->restoreSem);
626

M
Minghao Li 已提交
627 628
  // free memory in syncFreeNode
  // taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
629 630
}

M
Minghao Li 已提交
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
  syncPingLog2((char*)"==syncNodePing==", pMsg);
  int32_t ret = 0;

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);

  ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
}

int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
M
Minghao Li 已提交
646
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId, pSyncNode->vgId);
M
Minghao Li 已提交
647 648 649 650 651 652 653 654 655 656
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
  assert(ret == 0);

  syncPingDestroy(pMsg);
  return ret;
}

int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
657 658 659
    SRaftId*  destId = &(pSyncNode->peersId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
660 661 662 663 664 665 666 667
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

int32_t syncNodePingAll(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
668 669 670 671
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    SRaftId*  destId = &(pSyncNode->replicasId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pPingTimer);
  atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  pSyncNode->electTimerMS = ms;
  taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pElectTimer);
  atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
719 720
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
721
  int32_t electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
722 723 724 725
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  return ret;
}

M
Minghao Li 已提交
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pHeartbeatTimer);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
  return ret;
}

// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
M
Minghao Li 已提交
746 747 748 749
  if (pSyncNode->FpSendMsg != NULL) {
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

750
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
751
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
752 753 754
  } else {
    sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
  }
M
Minghao Li 已提交
755 756 757 758 759 760
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
M
Minghao Li 已提交
761 762 763 764
  if (pSyncNode->FpSendMsg != NULL) {
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

765
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
766
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
767 768 769
  } else {
    sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
  }
M
Minghao Li 已提交
770 771 772
  return 0;
}

M
Minghao Li 已提交
773 774 775 776
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
  char   u64buf[128];
  cJSON* pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
777 778 779
  if (pSyncNode != NULL) {
    // init by SSyncInfo
    cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
M
Minghao Li 已提交
780
    cJSON_AddItemToObject(pRoot, "SRaftCfg", raftCfg2Json(pSyncNode->pRaftCfg));
M
Minghao Li 已提交
781
    cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
M
Minghao Li 已提交
782 783 784
    cJSON_AddStringToObject(pRoot, "raftStorePath", pSyncNode->raftStorePath);
    cJSON_AddStringToObject(pRoot, "configPath", pSyncNode->configPath);

M
Minghao Li 已提交
785 786 787
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);

S
Shengliang Guan 已提交
788
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
789 790 791 792
    cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
    cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);

S
Shengliang Guan 已提交
793
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814
    cJSON_AddStringToObject(pRoot, "queue", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
    cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);

    // init internal
    cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
    cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
    cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
    cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);

    cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
    cJSON* pPeers = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
    }
    cJSON* pPeersId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
    }
M
Minghao Li 已提交
815

M
Minghao Li 已提交
816 817 818 819 820 821
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
    cJSON* pReplicasId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
    }
M
Minghao Li 已提交
822

M
Minghao Li 已提交
823 824 825 826 827 828 829
    // raft algorithm
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
    cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
    cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
    cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);

M
Minghao Li 已提交
830 831 832 833
    // life cycle
    snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->rid);
    cJSON_AddStringToObject(pRoot, "rid", u64buf);

M
Minghao Li 已提交
834 835 836
    // tla+ server vars
    cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
    cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
837
    cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore));
M
Minghao Li 已提交
838 839 840 841 842 843 844 845 846 847 848

    // tla+ candidate vars
    cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
    cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));

    // tla+ leader vars
    cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
    cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));

    // tla+ log vars
    cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
M
Minghao Li 已提交
849
    snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex);
M
Minghao Li 已提交
850 851
    cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);

M
Minghao Li 已提交
852 853 854 855 856
    // timer ms init
    cJSON_AddNumberToObject(pRoot, "pingBaseLine", pSyncNode->pingBaseLine);
    cJSON_AddNumberToObject(pRoot, "electBaseLine", pSyncNode->electBaseLine);
    cJSON_AddNumberToObject(pRoot, "hbBaseLine", pSyncNode->hbBaseLine);

M
Minghao Li 已提交
857 858 859 860
    // ping timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
    cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
M
Minghao Li 已提交
861
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock);
M
Minghao Li 已提交
862
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
M
Minghao Li 已提交
863
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
864 865 866
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
    cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
M
Minghao Li 已提交
867
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter);
M
Minghao Li 已提交
868 869 870 871 872 873
    cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);

    // elect timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
    cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
M
Minghao Li 已提交
874
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock);
M
Minghao Li 已提交
875
    cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
M
Minghao Li 已提交
876
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
877 878 879
    cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
    cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
M
Minghao Li 已提交
880
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter);
M
Minghao Li 已提交
881 882 883 884 885 886
    cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);

    // heartbeat timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
    cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
887
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock);
M
Minghao Li 已提交
888
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
M
Minghao Li 已提交
889
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser);
M
Minghao Li 已提交
890 891 892
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
    cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
M
Minghao Li 已提交
893
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter);
M
Minghao Li 已提交
894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910
    cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);

    // callback
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
    cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
    cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
    cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
M
Minghao Li 已提交
911 912 913 914 915 916 917
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
  return pJson;
}

M
Minghao Li 已提交
918 919 920 921 922 923 924
char* syncNode2Str(const SSyncNode* pSyncNode) {
  cJSON* pJson = syncNode2Json(pSyncNode);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
925 926 927 928 929 930 931 932 933 934 935 936 937
char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
  int   len = 256;
  char* s = (char*)taosMemoryMalloc(len);
  snprintf(s, len,
           "syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, electTimerLogicClock:%lu, "
           "electTimerLogicClockUser:%lu, "
           "electTimerMS:%d",
           pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->state,
           syncUtilState2String(pSyncNode->state), pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser,
           pSyncNode->electTimerMS);
  return s;
}

M
Minghao Li 已提交
938
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
M
Minghao Li 已提交
939 940 941 942 943 944 945 946 947 948 949
  bool hit = false;
  for (int i = 0; i < newConfig->replicaNum; ++i) {
    if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 &&
        pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) {
      newConfig->myIndex = i;
      hit = true;
      break;
    }
  }
  ASSERT(hit == true);

M
Minghao Li 已提交
950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975
  pSyncNode->pRaftCfg->cfg = *newConfig;
  int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
  ASSERT(ret == 0);

  // init internal
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);

  // init peersNum, peers, peersId
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
  int j = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
      j++;
    }
  }
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
  }

  // init replicaNum, replicasId
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
  }
M
Minghao Li 已提交
976 977 978

  syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
  syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
M
Minghao Li 已提交
979 980
  voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
  votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
981 982

  syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
M
Minghao Li 已提交
983 984
}

M
Minghao Li 已提交
985 986 987 988 989 990 991 992 993 994 995
SSyncNode* syncNodeAcquire(int64_t rid) {
  SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid);
  if (pNode == NULL) {
    sTrace("failed to acquire node from refId:%" PRId64, rid);
  }

  return pNode;
}

void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid); }

M
Minghao Li 已提交
996 997 998 999 1000 1001 1002 1003 1004 1005
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
    syncNodeBecomeFollower(pSyncNode);
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1006
  // maybe clear leader cache
M
Minghao Li 已提交
1007 1008 1009 1010
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
1011
  // state change
M
Minghao Li 已提交
1012 1013 1014
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1015 1016
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
1038
  // state change
M
Minghao Li 已提交
1039
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1040 1041

  // set leader cache
M
Minghao Li 已提交
1042 1043 1044
  pSyncNode->leaderCache = pSyncNode->myRaftId;

  for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1045 1046
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1047 1048 1049 1050
    pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
  }

  for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1051 1052
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1053 1054 1055
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
1056
  // stop elect timer
M
Minghao Li 已提交
1057
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1058 1059

  // start replicate right now!
M
Minghao Li 已提交
1060
  syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
1061 1062 1063

  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1064 1065 1066 1067 1068 1069
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  assert(voteGrantedMajority(pSyncNode->pVotesGranted));
  syncNodeBecomeLeader(pSyncNode);
M
Minghao Li 已提交
1070

M
Minghao Li 已提交
1071 1072
  syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);

M
Minghao Li 已提交
1073
  // Raft 3.6.2 Committing entries from previous terms
M
Minghao Li 已提交
1074 1075

  // use this now
M
Minghao Li 已提交
1076
  syncNodeAppendNoop(pSyncNode);
1077
  syncMaybeAdvanceCommitIndex(pSyncNode);  // maybe only one replica
M
Minghao Li 已提交
1078 1079

  // do not use this
M
Minghao Li 已提交
1080
  // syncNodeEqNoop(pSyncNode);
M
Minghao Li 已提交
1081 1082 1083 1084 1085
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
M
Minghao Li 已提交
1086 1087

  syncNodeLog2("==state change syncNodeFollower2Candidate==", pSyncNode);
M
Minghao Li 已提交
1088 1089 1090 1091 1092
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
  syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1093 1094

  syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode);
M
Minghao Li 已提交
1095 1096 1097 1098 1099
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
1100 1101

  syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode);
M
Minghao Li 已提交
1102 1103 1104
}

// raft vote --------------
M
Minghao Li 已提交
1105 1106 1107

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1108 1109 1110 1111 1112 1113 1114
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
  assert(term == pSyncNode->pRaftStore->currentTerm);
  assert(!raftStoreHasVoted(pSyncNode->pRaftStore));

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1115
// simulate get vote from outside
M
Minghao Li 已提交
1116 1117 1118
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

M
Minghao Li 已提交
1119
  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId);
M
Minghao Li 已提交
1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}

M
Minghao Li 已提交
1130 1131 1132 1133 1134
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
1135
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1136 1137 1138 1139 1140 1141
}

void syncNodePrint2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
1142
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1143 1144 1145 1146
}

void syncNodeLog(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
M
Minghao Li 已提交
1147
  sTraceLong("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
1148
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1149 1150 1151 1152
}

void syncNodeLog2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
M
Minghao Li 已提交
1153
  sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
1154
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1155 1156
}

M
Minghao Li 已提交
1157
// ------ local funciton ---------
M
Minghao Li 已提交
1158
// enqueue message ----
M
Minghao Li 已提交
1159 1160
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
1161
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
1162
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
M
Minghao Li 已提交
1163
                                              pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1164 1165
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1166
    syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
M
Minghao Li 已提交
1167
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1168
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1169 1170 1171
    } else {
      sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1172 1173
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1174
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
1175 1176
                 &pSyncNode->pPingTimer);
  } else {
1177
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
1178
           pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1179 1180 1181 1182 1183 1184 1185
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
M
Minghao Li 已提交
1186
                                              pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1187
    SRpcMsg      rpcMsg;
M
Minghao Li 已提交
1188
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1189
    syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
M
Minghao Li 已提交
1190
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1191
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1192 1193 1194
    } else {
      sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1195 1196
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1197
    // reset timer ms
M
Minghao Li 已提交
1198
    pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1199 1200
    taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pElectTimer);
M
Minghao Li 已提交
1201
  } else {
1202
    sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
1203
           pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
1204 1205 1206
  }
}

M
Minghao Li 已提交
1207 1208 1209 1210 1211 1212
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
      atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
    SyncTimeout* pSyncMsg =
        syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
M
Minghao Li 已提交
1213
                          pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1214 1215
    SRpcMsg rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1216
    syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
M
Minghao Li 已提交
1217
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1218
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1219 1220 1221
    } else {
      sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1222 1223
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1224
    taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
1225 1226
                 &pSyncNode->pHeartbeatTimer);
  } else {
M
Minghao Li 已提交
1227 1228
    sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
           "",
M
Minghao Li 已提交
1229 1230 1231 1232
           pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  }
}

M
Minghao Li 已提交
1233 1234 1235 1236 1237 1238
static int32_t syncNodeEqNoop(SSyncNode* ths) {
  int32_t ret = 0;
  assert(ths->state == TAOS_SYNC_STATE_LEADER);

  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1239
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1240 1241 1242 1243 1244 1245 1246 1247
  assert(pEntry != NULL);

  uint32_t           entryLen;
  char*              serialized = syncEntrySerialize(pEntry, &entryLen);
  SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen);
  assert(pSyncMsg->dataLen == entryLen);
  memcpy(pSyncMsg->data, serialized, entryLen);

S
Shengliang Guan 已提交
1248
  SRpcMsg rpcMsg = {0};
M
Minghao Li 已提交
1249
  syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1250
  if (ths->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1251
    ths->FpEqMsg(ths->msgcb, &rpcMsg);
M
Minghao Li 已提交
1252 1253 1254
  } else {
    sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
  }
M
Minghao Li 已提交
1255

wafwerar's avatar
wafwerar 已提交
1256
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266
  syncClientRequestDestroy(pSyncMsg);

  return ret;
}

static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1267
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1268 1269 1270 1271 1272 1273 1274
  assert(pEntry != NULL);

  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
    syncNodeReplicate(ths);
  }

M
Minghao Li 已提交
1275
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1276 1277 1278
  return ret;
}

M
Minghao Li 已提交
1279
// on message ----
M
Minghao Li 已提交
1280 1281 1282 1283 1284 1285 1286 1287 1288
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
  // log state
  char logBuf[1024];
  snprintf(logBuf, sizeof(logBuf),
           "==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%lu electTimerLogicClock:%lu, "
           "electTimerLogicClockUser:%lu, electTimerMS:%d",
           ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm,
           ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS);

M
Minghao Li 已提交
1289
  int32_t ret = 0;
M
Minghao Li 已提交
1290 1291
  syncPingLog2(logBuf, pMsg);
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
M
Minghao Li 已提交
1292 1293
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
M
Minghao Li 已提交
1294 1295 1296 1297 1298 1299 1300 1301

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

M
Minghao Li 已提交
1302 1303 1304 1305 1306
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

  return ret;
}

M
Minghao Li 已提交
1307
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
1308 1309 1310 1311
  int32_t ret = 0;
  syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg);
  return ret;
}
M
Minghao Li 已提交
1312

M
Minghao Li 已提交
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
1323
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
M
Minghao Li 已提交
1324 1325 1326
  int32_t ret = 0;
  syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);

M
Minghao Li 已提交
1327 1328 1329 1330 1331
  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
  assert(pEntry != NULL);

M
Minghao Li 已提交
1332 1333
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
M
Minghao Li 已提交
1334 1335

    // start replicate right now!
M
Minghao Li 已提交
1336
    syncNodeReplicate(ths);
M
Minghao Li 已提交
1337

M
Minghao Li 已提交
1338 1339 1340 1341
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
1342
    if (ths->pFsm != NULL) {
M
Minghao Li 已提交
1343
      // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
1344
      if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
1345 1346 1347 1348 1349 1350 1351
        SFsmCbMeta cbMeta;
        cbMeta.index = pEntry->index;
        cbMeta.isWeak = pEntry->isWeak;
        cbMeta.code = 0;
        cbMeta.state = ths->state;
        cbMeta.seqNum = pEntry->seqNum;
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
1352
      }
M
Minghao Li 已提交
1353 1354 1355
    }
    rpcFreeCont(rpcMsg.pCont);

M
Minghao Li 已提交
1356 1357 1358
    // only myself, maybe commit
    syncMaybeAdvanceCommitIndex(ths);

M
Minghao Li 已提交
1359
  } else {
M
Minghao Li 已提交
1360 1361 1362 1363
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
1364
    if (ths->pFsm != NULL) {
M
Minghao Li 已提交
1365
      // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
M
Minghao Li 已提交
1366
      if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
1367 1368 1369 1370 1371 1372 1373
        SFsmCbMeta cbMeta;
        cbMeta.index = pEntry->index;
        cbMeta.isWeak = pEntry->isWeak;
        cbMeta.code = 1;
        cbMeta.state = ths->state;
        cbMeta.seqNum = pEntry->seqNum;
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
1374
      }
M
Minghao Li 已提交
1375 1376
    }
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1377 1378
  }

M
Minghao Li 已提交
1379
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1380
  return ret;
1381
}
M
Minghao Li 已提交
1382 1383 1384

static void syncFreeNode(void* param) {
  SSyncNode* pNode = param;
M
Minghao Li 已提交
1385 1386
  // inner object already free
  // syncNodePrint2((char*)"==syncFreeNode==", pNode);
M
Minghao Li 已提交
1387

wafwerar's avatar
wafwerar 已提交
1388
  taosMemoryFree(pNode);
M
Minghao Li 已提交
1389
}
S
Shengliang Guan 已提交
1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401

const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
      return "FOLLOWER";
    case TAOS_SYNC_STATE_CANDIDATE:
      return "CANDIDATE";
    case TAOS_SYNC_STATE_LEADER:
      return "LEADER";
    default:
      return "ERROR";
  }
M
Minghao Li 已提交
1402
}