syncMain.c 64.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

M
Minghao Li 已提交
16
#include "sync.h"
M
Minghao Li 已提交
17 18
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
19
#include "syncCommit.h"
M
Minghao Li 已提交
20
#include "syncElection.h"
M
Minghao Li 已提交
21
#include "syncEnv.h"
M
Minghao Li 已提交
22
#include "syncIndexMgr.h"
M
Minghao Li 已提交
23
#include "syncInt.h"
M
Minghao Li 已提交
24
#include "syncMessage.h"
M
Minghao Li 已提交
25
#include "syncRaftCfg.h"
M
Minghao Li 已提交
26
#include "syncRaftLog.h"
M
Minghao Li 已提交
27
#include "syncRaftStore.h"
M
Minghao Li 已提交
28
#include "syncReplication.h"
M
Minghao Li 已提交
29 30
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
31
#include "syncRespMgr.h"
M
Minghao Li 已提交
32
#include "syncSnapshot.h"
M
Minghao Li 已提交
33
#include "syncTimeout.h"
M
Minghao Li 已提交
34
#include "syncUtil.h"
M
Minghao Li 已提交
35
#include "syncVoteMgr.h"
M
Minghao Li 已提交
36
#include "tref.h"
M
Minghao Li 已提交
37

38
bool gRaftDetailLog = false;
39

M
Minghao Li 已提交
40 41 42
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
43
// enqueue message ----
M
Minghao Li 已提交
44 45 46 47 48
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
M
Minghao Li 已提交
49

M
Minghao Li 已提交
50
// process message ----
M
Minghao Li 已提交
51 52 53
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
M
Minghao Li 已提交
54 55 56

// life cycle
static void syncFreeNode(void* param);
M
Minghao Li 已提交
57 58 59
// ---------------------------------

int32_t syncInit() {
M
Minghao Li 已提交
60 61 62 63 64 65 66 67 68 69 70
  int32_t ret = 0;

  if (!syncEnvIsStart()) {
    tsNodeRefId = taosOpenRef(200, syncFreeNode);
    if (tsNodeRefId < 0) {
      sError("failed to init node ref");
      syncCleanUp();
      ret = -1;
    } else {
      ret = syncEnvStart();
    }
M
Minghao Li 已提交
71 72
  }

M
Minghao Li 已提交
73
  return ret;
M
Minghao Li 已提交
74
}
M
Minghao Li 已提交
75

M
Minghao Li 已提交
76 77 78
void syncCleanUp() {
  int32_t ret = syncEnvStop();
  assert(ret == 0);
M
Minghao Li 已提交
79 80 81 82 83

  if (tsNodeRefId != -1) {
    taosCloseRef(tsNodeRefId);
    tsNodeRefId = -1;
  }
M
Minghao Li 已提交
84
}
M
Minghao Li 已提交
85

M
Minghao Li 已提交
86
int64_t syncOpen(const SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
87 88
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
89

90 91 92
  if (gRaftDetailLog) {
    syncNodeLog2("syncNodeOpen open success", pSyncNode);
  }
M
Minghao Li 已提交
93

M
Minghao Li 已提交
94 95 96 97 98 99 100
  pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
  if (pSyncNode->rid < 0) {
    syncFreeNode(pSyncNode);
    return -1;
  }

  return pSyncNode->rid;
M
Minghao Li 已提交
101
}
M
Minghao Li 已提交
102

M
Minghao Li 已提交
103 104 105 106 107
void syncStart(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122

  if (pSyncNode->pRaftCfg->isStandBy) {
    syncNodeStartStandBy(pSyncNode);
  } else {
    syncNodeStart(pSyncNode);
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void syncStartNormal(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
123 124 125 126 127
  syncNodeStart(pSyncNode);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
128 129 130 131 132 133 134 135 136 137
void syncStartStandBy(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  syncNodeStartStandBy(pSyncNode);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
138
void syncStop(int64_t rid) {
M
Minghao Li 已提交
139 140 141 142
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
143
  syncNodeClose(pSyncNode);
M
Minghao Li 已提交
144 145 146

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  taosRemoveRef(tsNodeRefId, rid);
M
Minghao Li 已提交
147
}
M
Minghao Li 已提交
148

M
Minghao Li 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
int32_t syncSetStandby(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return -1;
  }

  if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
    return -1;
  }

  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);

  pSyncNode->pRaftCfg->isStandBy = 1;
  raftCfgPersist(pSyncNode->pRaftCfg);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return 0;
}

M
Minghao Li 已提交
176 177
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
  int32_t ret = 0;
178
  char*   newconfig = syncCfg2Str((SSyncCfg*)pSyncCfg);
179 180

  if (gRaftDetailLog) {
181
    sInfo("==syncReconfig== newconfig:%s", newconfig);
182
  }
M
Minghao Li 已提交
183

M
Minghao Li 已提交
184
  SRpcMsg rpcMsg = {0};
185
  rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
S
Shengliang Guan 已提交
186
  rpcMsg.info.noResp = 1;
187
  rpcMsg.contLen = strlen(newconfig) + 1;
M
Minghao Li 已提交
188
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
189 190
  snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
  taosMemoryFree(newconfig);
M
Minghao Li 已提交
191
  ret = syncPropose(rid, &rpcMsg, false);
M
Minghao Li 已提交
192 193
  return ret;
}
M
Minghao Li 已提交
194

195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
int32_t syncLeaderTransfer(int64_t rid) {
  int32_t ret = 0;

  return ret;
}

int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return false;
  }
  assert(rid == pSyncNode->rid);
  int32_t ret = 0;

  if (pSyncNode->replicaNum == 1) {
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
    sError("only one replica, cannot drop leader");
    return TAOS_SYNC_ONLY_ONE_REPLICA;
  }

  SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
  ASSERT(pMsg != NULL);
  SRpcMsg rpcMsg = {0};
  syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
  syncLeaderTransferDestroy(pMsg);

  ret = syncPropose(rid, &rpcMsg, false);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

229 230 231 232 233 234 235 236 237 238 239 240 241 242
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
  int32_t ret = 0;
  char*   newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);

  pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE;
  pRpcMsg->info.noResp = 1;
  pRpcMsg->contLen = strlen(newconfig) + 1;
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
  snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
  taosMemoryFree(newconfig);

  return ret;
}

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
bool syncCanLeaderTransfer(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return false;
  }
  assert(rid == pSyncNode->rid);

  if (pSyncNode->replicaNum == 1) {
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
    return true;
  }

  bool matchOK = true;
  if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE || pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SyncIndex myCommitIndex = pSyncNode->commitIndex;
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
      if (peerMatchIndex < myCommitIndex) {
        matchOK = false;
      }
    }
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return matchOK;
}

int32_t syncGiveUpLeader(int64_t rid) { return 0; }

M
Minghao Li 已提交
277 278 279 280
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
  int32_t ret = syncPropose(rid, pMsg, isWeak);
  return ret;
}
M
Minghao Li 已提交
281

M
Minghao Li 已提交
282 283 284 285 286 287
ESyncState syncGetMyRole(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
M
Minghao Li 已提交
288 289 290 291
  ESyncState state = pSyncNode->state;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return state;
M
Minghao Li 已提交
292 293
}

M
Minghao Li 已提交
294 295 296 297 298 299 300 301 302 303 304 305
bool syncIsRestoreFinish(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return false;
  }
  assert(rid == pSyncNode->rid);
  bool b = pSyncNode->restoreFinish;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return b;
}

306 307 308 309 310 311
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return -1;
  }
  assert(rid == pSyncNode->rid);
312
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;
313 314 315 316 317

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return 0;
}

M
Minghao Li 已提交
318 319 320 321 322
const char* syncGetMyRoleStr(int64_t rid) {
  const char* s = syncUtilState2String(syncGetMyRole(rid));
  return s;
}

M
Minghao Li 已提交
323 324 325 326 327 328 329 330 331 332 333 334
int32_t syncGetVgId(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
  int32_t vgId = pSyncNode->vgId;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return vgId;
}

M
Minghao Li 已提交
335
SyncTerm syncGetMyTerm(int64_t rid) {
M
Minghao Li 已提交
336 337
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
338 339 340 341 342 343 344 345 346
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
  SyncTerm term = pSyncNode->pRaftStore->currentTerm;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return term;
}

M
Minghao Li 已提交
347 348 349 350 351 352 353 354 355
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
  assert(rid == pSyncNode->rid);
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
356 357
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
M
Minghao Li 已提交
358
    (pEpSet->numOfEps)++;
M
Minghao Li 已提交
359 360

    sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
M
Minghao Li 已提交
361 362
  }
  pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
M
Minghao Li 已提交
363

M
Minghao Li 已提交
364
  sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
M
Minghao Li 已提交
365 366 367 368

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);

  SRespStub stub;
  int32_t   ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
    memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

S
Shengliang Guan 已提交
386
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) {
M
Minghao Li 已提交
387 388 389 390 391 392 393 394 395
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);

  SRespStub stub;
  int32_t   ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
S
Shengliang Guan 已提交
396
    *pInfo = stub.rpcMsg.info;
M
Minghao Li 已提交
397 398 399 400 401 402
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

403
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
M
Minghao Li 已提交
404 405 406 407 408 409
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
    return;
  }
  assert(rid == pSyncNode->rid);
S
Shengliang Guan 已提交
410
  pSyncNode->msgcb = msgcb;
M
Minghao Li 已提交
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

char* sync2SimpleStr(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid);
    return NULL;
  }
  assert(rid == pSyncNode->rid);
  char* s = syncNode2SimpleStr(pSyncNode);
  taosReleaseRef(tsNodeRefId, pSyncNode->rid);

  return s;
}

void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->pingBaseLine = pingTimerMS;
  pSyncNode->pingTimerMS = pingTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->electBaseLine = electTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->hbBaseLine = hbTimerMS;
  pSyncNode->heartbeatTimerMS = hbTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
464
  int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
M
Minghao Li 已提交
465

466
  SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
467 468 469
  if (pSyncNode == NULL) {
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }
M
Minghao Li 已提交
470
  assert(rid == pSyncNode->rid);
471
  sTrace("sync event vgId:%d propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
472

M
Minghao Li 已提交
473
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
474 475 476 477 478 479
    SRespStub stub;
    stub.createTime = taosGetTimestampMs();
    stub.rpcMsg = *pMsg;
    uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);

    SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
M
Minghao Li 已提交
480 481
    SRpcMsg            rpcMsg;
    syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
482 483 484

    if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
      ret = TAOS_SYNC_PROPOSE_SUCCESS;
M
Minghao Li 已提交
485 486 487
    } else {
      sTrace("syncPropose pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
488 489
    syncClientRequestDestroy(pSyncMsg);
  } else {
M
Minghao Li 已提交
490
    sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
491
    ret = TAOS_SYNC_PROPOSE_NOT_LEADER;
M
Minghao Li 已提交
492
  }
M
Minghao Li 已提交
493 494 495 496 497

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

M
Minghao Li 已提交
498
// open/close --------------
499 500 501
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
  SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;

502 503
  sInfo("sync event vgId:%d sync open", pSyncInfo->vgId);

wafwerar's avatar
wafwerar 已提交
504
  SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
M
Minghao Li 已提交
505
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
506
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
507

M
Minghao Li 已提交
508 509 510 511 512 513 514
  int32_t ret = 0;
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
      return NULL;
    }
515
  }
M
Minghao Li 已提交
516

517 518
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
519 520 521 522 523
    // create a new raft config file
    SRaftCfgMeta meta;
    meta.isStandBy = pSyncInfo->isStandBy;
    meta.snapshotEnable = pSyncInfo->snapshotEnable;
    ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
M
Minghao Li 已提交
524
    assert(ret == 0);
525 526 527 528 529 530 531

  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
    assert(pSyncNode->pRaftCfg != NULL);
    pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;

532 533 534 535 536
    if (gRaftDetailLog) {
      char* seralized = raftCfg2Str(pSyncNode->pRaftCfg);
      sInfo("syncNodeOpen update config :%s", seralized);
      taosMemoryFree(seralized);
    }
537 538

    raftCfgClose(pSyncNode->pRaftCfg);
M
Minghao Li 已提交
539 540
  }

M
Minghao Li 已提交
541
  // init by SSyncInfo
M
Minghao Li 已提交
542 543
  pSyncNode->vgId = pSyncInfo->vgId;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
M
Minghao Li 已提交
544
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path);
M
Minghao Li 已提交
545 546
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);

M
Minghao Li 已提交
547
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
548
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
549
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
550
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
551

M
Minghao Li 已提交
552 553 554 555
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
  assert(pSyncNode->pRaftCfg != NULL);

M
Minghao Li 已提交
556
  // init internal
M
Minghao Li 已提交
557 558
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
559

M
Minghao Li 已提交
560
  // init peersNum, peers, peersId
M
Minghao Li 已提交
561
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
M
Minghao Li 已提交
562
  int j = 0;
M
Minghao Li 已提交
563 564 565
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
566 567 568
      j++;
    }
  }
M
Minghao Li 已提交
569
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
570
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
571
  }
M
Minghao Li 已提交
572

M
Minghao Li 已提交
573
  // init replicaNum, replicasId
M
Minghao Li 已提交
574 575 576
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
577 578
  }

M
Minghao Li 已提交
579
  // init raft algorithm
M
Minghao Li 已提交
580
  pSyncNode->pFsm = pSyncInfo->pFsm;
M
Minghao Li 已提交
581
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
582 583
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
584
  // init life cycle outside
M
Minghao Li 已提交
585

M
Minghao Li 已提交
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
610
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
611
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
612
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
M
Minghao Li 已提交
613 614
  assert(pSyncNode->pRaftStore != NULL);

M
Minghao Li 已提交
615
  // init TLA+ candidate vars
M
Minghao Li 已提交
616 617 618 619 620
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
  assert(pSyncNode->pVotesGranted != NULL);
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
  assert(pSyncNode->pVotesRespond != NULL);

M
Minghao Li 已提交
621 622 623 624 625 626 627 628 629
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pNextIndex != NULL);
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pMatchIndex != NULL);

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
  assert(pSyncNode->pLogStore != NULL);
M
Minghao Li 已提交
630
  pSyncNode->commitIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
631

M
Minghao Li 已提交
632 633 634 635 636
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
637
  // init ping timer
M
Minghao Li 已提交
638
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
639
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
640 641
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
642
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
643
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
644

M
Minghao Li 已提交
645 646
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
647
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
648 649
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
M
Minghao Li 已提交
650
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
651 652 653 654
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
655
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
656 657
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
658
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
659 660
  pSyncNode->heartbeatTimerCounter = 0;

M
Minghao Li 已提交
661
  // init callback
M
Minghao Li 已提交
662 663
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
M
Minghao Li 已提交
664
  pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb;
M
Minghao Li 已提交
665
  pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
M
Minghao Li 已提交
666

M
Minghao Li 已提交
667 668 669
  pSyncNode->FpOnSnapshotSend = syncNodeOnSnapshotSendCb;
  pSyncNode->FpOnSnapshotRsp = syncNodeOnSnapshotRspCb;

M
Minghao Li 已提交
670
  if (pSyncNode->pRaftCfg->snapshotEnable) {
M
Minghao Li 已提交
671
    sInfo("sync node use snapshot");
M
Minghao Li 已提交
672 673 674 675
    pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteSnapshotCb;
    pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplySnapshotCb;
    pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesSnapshotCb;
    pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplySnapshotCb;
M
Minghao Li 已提交
676 677 678 679 680 681 682

  } else {
    sInfo("sync node do not use snapshot");
    pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
    pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
    pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
    pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
683 684
  }

M
Minghao Li 已提交
685 686 687 688
  // tools
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
  assert(pSyncNode->pSyncRespMgr != NULL);

689 690
  // restore state
  pSyncNode->restoreFinish = false;
691 692 693 694 695 696

  // pSyncNode->pSnapshot = NULL;
  // if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
  //   pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
  //   pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
  // }
697
  // tsem_init(&(pSyncNode->restoreSem), 0, 0);
698

M
Minghao Li 已提交
699 700 701 702 703 704 705 706
  // snapshot senders
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
  }

  // snapshot receivers
707
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
708

M
Minghao Li 已提交
709
  // start in syncNodeStart
M
Minghao Li 已提交
710
  // start raft
M
Minghao Li 已提交
711
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
712

713
  // snapshot meta
714
  // pSyncNode->sMeta.lastConfigIndex = -1;
715

M
Minghao Li 已提交
716 717 718
  return pSyncNode;
}

M
Minghao Li 已提交
719 720
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
721
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
722
    raftStoreNextTerm(pSyncNode->pRaftStore);
723
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
724

725
    // Raft 3.6.2 Committing entries from previous terms
M
format  
Minghao Li 已提交
726

727 728 729
    // use this now
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);  // maybe only one replica
730

731 732
    if (gRaftDetailLog) {
      syncNodeLog2("==state change become leader immediately==", pSyncNode);
733 734
    }

735 736 737
    return;
  }

738
  syncNodeBecomeFollower(pSyncNode, "first start");
M
Minghao Li 已提交
739

740
  // int32_t ret = 0;
M
Minghao Li 已提交
741
  // ret = syncNodeStartPingTimer(pSyncNode);
742
  // assert(ret == 0);
743

744 745
  if (gRaftDetailLog) {
    syncNodeLog2("==state change become leader immediately==", pSyncNode);
746
  }
M
Minghao Li 已提交
747 748
}

M
Minghao Li 已提交
749 750 751 752 753 754 755 756 757 758 759
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
}

M
Minghao Li 已提交
760
void syncNodeClose(SSyncNode* pSyncNode) {
761 762
  sInfo("sync event vgId:%d sync close", pSyncNode->vgId);

M
Minghao Li 已提交
763
  int32_t ret;
M
Minghao Li 已提交
764
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
765 766 767 768

  ret = raftStoreClose(pSyncNode->pRaftStore);
  assert(ret == 0);

M
Minghao Li 已提交
769
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
770 771 772 773 774
  voteGrantedDestroy(pSyncNode->pVotesGranted);
  votesRespondDestory(pSyncNode->pVotesRespond);
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
  logStoreDestory(pSyncNode->pLogStore);
M
Minghao Li 已提交
775
  raftCfgClose(pSyncNode->pRaftCfg);
M
Minghao Li 已提交
776 777 778 779 780

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
781 782 783 784
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
785 786 787 788 789 790 791
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    if ((pSyncNode->senders)[i] != NULL) {
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
792 793 794 795 796
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

797
  /*
798 799 800
  if (pSyncNode->pSnapshot != NULL) {
    taosMemoryFree(pSyncNode->pSnapshot);
  }
801
  */
802

803
  // tsem_destroy(&pSyncNode->restoreSem);
804

M
Minghao Li 已提交
805 806
  // free memory in syncFreeNode
  // taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
807 808
}

M
Minghao Li 已提交
809 810 811
// option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }

M
Minghao Li 已提交
812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
  syncPingLog2((char*)"==syncNodePing==", pMsg);
  int32_t ret = 0;

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);

  ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
}

int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
M
Minghao Li 已提交
827
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId, pSyncNode->vgId);
M
Minghao Li 已提交
828 829 830 831 832 833 834 835 836 837
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
  assert(ret == 0);

  syncPingDestroy(pMsg);
  return ret;
}

int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
838 839 840
    SRaftId*  destId = &(pSyncNode->peersId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
841 842 843 844 845 846 847 848
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

int32_t syncNodePingAll(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
849 850 851 852
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    SRaftId*  destId = &(pSyncNode->replicasId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pPingTimer);
  atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  pSyncNode->electTimerMS = ms;
  taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pElectTimer);
  atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
900 901
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
902 903 904 905 906 907 908
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
909 910 911 912
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  return ret;
}

M
Minghao Li 已提交
913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pHeartbeatTimer);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
  return ret;
}

// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
M
Minghao Li 已提交
933
  if (pSyncNode->FpSendMsg != NULL) {
M
Minghao Li 已提交
934 935 936 937 938 939
    if (gRaftDetailLog) {
      char* JsonStr = syncRpcMsg2Str(pMsg);
      syncUtilJson2Line(JsonStr);
      sTrace("sync send msg, vgId:%d, type:%d, msg:%s", pSyncNode->vgId, pMsg->msgType, JsonStr);
      taosMemoryFree(JsonStr);
    }
M
Minghao Li 已提交
940

M
Minghao Li 已提交
941 942 943
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

944
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
945
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
946 947 948
  } else {
    sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
  }
M
Minghao Li 已提交
949 950 951 952 953 954
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
M
Minghao Li 已提交
955 956 957 958
  if (pSyncNode->FpSendMsg != NULL) {
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

959
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
960
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
961 962 963
  } else {
    sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
  }
M
Minghao Li 已提交
964 965 966
  return 0;
}

M
Minghao Li 已提交
967
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
C
Cary Xu 已提交
968
  char   u64buf[128] = {0};
M
Minghao Li 已提交
969 970
  cJSON* pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
971 972 973
  if (pSyncNode != NULL) {
    // init by SSyncInfo
    cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
M
Minghao Li 已提交
974
    cJSON_AddItemToObject(pRoot, "SRaftCfg", raftCfg2Json(pSyncNode->pRaftCfg));
M
Minghao Li 已提交
975
    cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
M
Minghao Li 已提交
976 977 978
    cJSON_AddStringToObject(pRoot, "raftStorePath", pSyncNode->raftStorePath);
    cJSON_AddStringToObject(pRoot, "configPath", pSyncNode->configPath);

M
Minghao Li 已提交
979 980 981
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);

S
Shengliang Guan 已提交
982
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
983 984 985 986
    cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
    cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);

S
Shengliang Guan 已提交
987
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
    cJSON_AddStringToObject(pRoot, "queue", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
    cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);

    // init internal
    cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
    cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
    cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
    cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);

    cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
    cJSON* pPeers = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
    }
    cJSON* pPeersId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
    }
M
Minghao Li 已提交
1009

M
Minghao Li 已提交
1010 1011 1012 1013 1014 1015
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
    cJSON* pReplicasId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
    }
M
Minghao Li 已提交
1016

M
Minghao Li 已提交
1017 1018 1019 1020 1021 1022 1023
    // raft algorithm
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
    cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
    cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
    cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);

M
Minghao Li 已提交
1024 1025 1026 1027
    // life cycle
    snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->rid);
    cJSON_AddStringToObject(pRoot, "rid", u64buf);

M
Minghao Li 已提交
1028 1029 1030
    // tla+ server vars
    cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
    cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
1031
    cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042

    // tla+ candidate vars
    cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
    cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));

    // tla+ leader vars
    cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
    cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));

    // tla+ log vars
    cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
M
Minghao Li 已提交
1043
    snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex);
M
Minghao Li 已提交
1044 1045
    cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);

M
Minghao Li 已提交
1046 1047 1048 1049 1050
    // timer ms init
    cJSON_AddNumberToObject(pRoot, "pingBaseLine", pSyncNode->pingBaseLine);
    cJSON_AddNumberToObject(pRoot, "electBaseLine", pSyncNode->electBaseLine);
    cJSON_AddNumberToObject(pRoot, "hbBaseLine", pSyncNode->hbBaseLine);

M
Minghao Li 已提交
1051 1052 1053 1054
    // ping timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
    cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
M
Minghao Li 已提交
1055
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock);
M
Minghao Li 已提交
1056
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
M
Minghao Li 已提交
1057
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1058 1059 1060
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
    cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
M
Minghao Li 已提交
1061
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter);
M
Minghao Li 已提交
1062 1063 1064 1065 1066 1067
    cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);

    // elect timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
    cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
M
Minghao Li 已提交
1068
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock);
M
Minghao Li 已提交
1069
    cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
M
Minghao Li 已提交
1070
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
1071 1072 1073
    cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
    cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
M
Minghao Li 已提交
1074
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter);
M
Minghao Li 已提交
1075 1076 1077 1078 1079 1080
    cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);

    // heartbeat timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
    cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1081
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock);
M
Minghao Li 已提交
1082
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
M
Minghao Li 已提交
1083
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser);
M
Minghao Li 已提交
1084 1085 1086
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
    cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
M
Minghao Li 已提交
1087
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter);
M
Minghao Li 已提交
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104
    cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);

    // callback
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
    cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
    cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
    cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
M
Minghao Li 已提交
1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117

    // restoreFinish
    cJSON_AddNumberToObject(pRoot, "restoreFinish", pSyncNode->restoreFinish);

    // snapshot senders
    cJSON* pSenders = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "senders", pSenders);
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      cJSON_AddItemToArray(pSenders, snapshotSender2Json((pSyncNode->senders)[i]));
    }

    // snapshot receivers
    cJSON* pReceivers = cJSON_CreateArray();
1118
    cJSON_AddItemToObject(pRoot, "receiver", snapshotReceiver2Json(pSyncNode->pNewNodeReceiver));
M
Minghao Li 已提交
1119 1120 1121 1122 1123 1124 1125
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
  return pJson;
}

M
Minghao Li 已提交
1126 1127 1128 1129 1130 1131 1132
char* syncNode2Str(const SSyncNode* pSyncNode) {
  cJSON* pJson = syncNode2Json(pSyncNode);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
1133 1134 1135 1136
char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
  int   len = 256;
  char* s = (char*)taosMemoryMalloc(len);
  snprintf(s, len,
M
Minghao Li 已提交
1137
           "syncNode: vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, "
1138
           "electTimerLogicClock:%lu, "
M
Minghao Li 已提交
1139
           "electTimerLogicClockUser:%lu, "
M
Minghao Li 已提交
1140
           "electTimerMS:%d, replicaNum:%d",
M
Minghao Li 已提交
1141
           pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->state,
1142
           syncUtilState2String(pSyncNode->state), pSyncNode->pRaftCfg->isStandBy, pSyncNode->electTimerLogicClock,
M
Minghao Li 已提交
1143
           pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, pSyncNode->replicaNum);
M
Minghao Li 已提交
1144 1145 1146
  return s;
}

1147
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) {
1148
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
M
Minghao Li 已提交
1149
  pSyncNode->pRaftCfg->cfg = *newConfig;
1150 1151
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

M
Minghao Li 已提交
1152
  int32_t ret = 0;
M
Minghao Li 已提交
1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175

  // init internal
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);

  // init peersNum, peers, peersId
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
  int j = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
      j++;
    }
  }
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
  }

  // init replicaNum, replicasId
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
  }
M
Minghao Li 已提交
1176 1177 1178

  syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
  syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
M
Minghao Li 已提交
1179 1180
  voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
  votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1181

1182 1183
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

1184 1185
  bool IamInOld = false;
  bool IamInNew = false;
1186 1187 1188
  for (int i = 0; i < oldConfig.replicaNum; ++i) {
    if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
1189
      IamInOld = false;
1190 1191 1192 1193
      break;
    }
  }

M
Minghao Li 已提交
1194 1195 1196
  for (int i = 0; i < newConfig->replicaNum; ++i) {
    if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
1197
      IamInNew = false;
M
Minghao Li 已提交
1198 1199 1200
      break;
    }
  }
M
Minghao Li 已提交
1201

1202 1203 1204 1205 1206
  *isDrop = true;
  if (IamInOld && !IamInNew) {
    *isDrop = true;  
  } else {
    *isDrop = false;
M
Minghao Li 已提交
1207 1208
  }

1209 1210 1211
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;   // change isStandBy to normal
  }
M
Minghao Li 已提交
1212
  raftCfgPersist(pSyncNode->pRaftCfg);
1213 1214 1215 1216

  if (gRaftDetailLog) {
    syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
  }
M
Minghao Li 已提交
1217 1218
}

M
Minghao Li 已提交
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
SSyncNode* syncNodeAcquire(int64_t rid) {
  SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid);
  if (pNode == NULL) {
    sTrace("failed to acquire node from refId:%" PRId64, rid);
  }

  return pNode;
}

void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid); }

M
Minghao Li 已提交
1230 1231 1232 1233
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1234
    syncNodeBecomeFollower(pSyncNode, "update term");
M
Minghao Li 已提交
1235 1236 1237 1238
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1239 1240
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
  sInfo("sync event vgId:%d become follower, %s", pSyncNode->vgId, debugStr);
M
Minghao Li 已提交
1241

M
Minghao Li 已提交
1242
  // maybe clear leader cache
M
Minghao Li 已提交
1243 1244 1245 1246
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
1247
  // state change
M
Minghao Li 已提交
1248 1249 1250
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1251 1252
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1273 1274
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
  sInfo("sync event vgId:%d become leader, %s", pSyncNode->vgId, debugStr);
M
Minghao Li 已提交
1275

M
Minghao Li 已提交
1276
  // state change
M
Minghao Li 已提交
1277
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1278 1279

  // set leader cache
M
Minghao Li 已提交
1280 1281 1282
  pSyncNode->leaderCache = pSyncNode->myRaftId;

  for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1283 1284
    // maybe overwrite myself, no harm
    // just do it!
1285 1286 1287 1288 1289 1290 1291 1292 1293

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1294 1295 1296
  }

  for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1297 1298
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1299 1300 1301
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

1302 1303
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1304 1305 1306 1307 1308
  if (pMySender != NULL) {
    for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1309
    }
1310
    (pMySender->privateTerm) += 100;
1311 1312
  }

M
Minghao Li 已提交
1313
  // stop elect timer
M
Minghao Li 已提交
1314
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1315 1316

  // start replicate right now!
M
Minghao Li 已提交
1317
  syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
1318 1319 1320

  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1321 1322 1323 1324 1325
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  assert(voteGrantedMajority(pSyncNode->pVotesGranted));
1326
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1327

M
Minghao Li 已提交
1328 1329
  syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);

M
Minghao Li 已提交
1330
  // Raft 3.6.2 Committing entries from previous terms
M
Minghao Li 已提交
1331 1332

  // use this now
M
Minghao Li 已提交
1333
  syncNodeAppendNoop(pSyncNode);
1334
  syncMaybeAdvanceCommitIndex(pSyncNode);  // maybe only one replica
M
Minghao Li 已提交
1335 1336

  // do not use this
M
Minghao Li 已提交
1337
  // syncNodeEqNoop(pSyncNode);
M
Minghao Li 已提交
1338 1339 1340 1341 1342
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
M
Minghao Li 已提交
1343 1344

  syncNodeLog2("==state change syncNodeFollower2Candidate==", pSyncNode);
M
Minghao Li 已提交
1345 1346 1347 1348
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1349
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1350 1351

  syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode);
M
Minghao Li 已提交
1352 1353 1354 1355
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1356
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1357 1358

  syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode);
M
Minghao Li 已提交
1359 1360 1361
}

// raft vote --------------
M
Minghao Li 已提交
1362 1363 1364

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1365 1366 1367 1368 1369 1370 1371
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
  assert(term == pSyncNode->pRaftStore->currentTerm);
  assert(!raftStoreHasVoted(pSyncNode->pRaftStore));

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1372
// simulate get vote from outside
M
Minghao Li 已提交
1373 1374 1375
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

M
Minghao Li 已提交
1376
  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId);
M
Minghao Li 已提交
1377 1378 1379 1380 1381 1382 1383 1384 1385 1386
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}

M
Minghao Li 已提交
1387
// snapshot --------------
M
Minghao Li 已提交
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
    pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

1400
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
M
Minghao Li 已提交
1401 1402 1403 1404
  ASSERT(syncNodeHasSnapshot(pSyncNode));
  ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL);
  ASSERT(index >= SYNC_INDEX_BEGIN);

1405 1406
  SSnapshot snapshot;
  pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1407
  bool b = (index <= snapshot.lastApplyIndex);
1408 1409 1410
  return b;
}

M
Minghao Li 已提交
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
    pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1424 1425 1426 1427 1428 1429 1430
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
    if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
      pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
    }

M
Minghao Li 已提交
1431 1432 1433
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1434 1435 1436 1437
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1438
  } else {
M
Minghao Li 已提交
1439 1440
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1441
  }
M
Minghao Li 已提交
1442

M
Minghao Li 已提交
1443 1444 1445 1446 1447 1448 1449
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1450 1451
  return 0;
}
M
Minghao Li 已提交
1452

M
Minghao Li 已提交
1453 1454 1455 1456 1457
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1458
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
1459
  ASSERT(index >= SYNC_INDEX_BEGIN);
M
Minghao Li 已提交
1460 1461 1462
  SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
  ASSERT(index <= syncStartIndex);

M
Minghao Li 已提交
1463
  SyncIndex preIndex = index - 1;
M
Minghao Li 已提交
1464 1465 1466
  return preIndex;
}

M
Minghao Li 已提交
1467 1468 1469 1470
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  ASSERT(index >= SYNC_INDEX_BEGIN);
  SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
  ASSERT(index <= syncStartIndex);
M
Minghao Li 已提交
1471

M
Minghao Li 已提交
1472 1473
  if (index == SYNC_INDEX_BEGIN) {
    return 0;
M
Minghao Li 已提交
1474
  }
M
Minghao Li 已提交
1475

M
Minghao Li 已提交
1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490
  SyncTerm preTerm = 0;
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
    if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
      pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
    }

    if (index > snapshot.lastApplyIndex + 1) {
      // should be log preTerm
      SSyncRaftEntry* pPreEntry = NULL;
      int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
      ASSERT(code == 0);
      ASSERT(pPreEntry != NULL);

M
Minghao Li 已提交
1491 1492
      preTerm = pPreEntry->term;
      taosMemoryFree(pPreEntry);
M
Minghao Li 已提交
1493 1494 1495

    } else if (index == snapshot.lastApplyIndex + 1) {
      preTerm = snapshot.lastApplyTerm;
M
Minghao Li 已提交
1496

M
Minghao Li 已提交
1497
    } else {
1498
      // maybe snapshot change
M
Minghao Li 已提交
1499 1500 1501 1502 1503 1504 1505 1506 1507 1508
      sError("sync get pre term, bad scene. index:%ld", index);
      logStoreLog2("sync get pre term, bad scene", pSyncNode->pLogStore);

      SSyncRaftEntry* pPreEntry = NULL;
      int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
      ASSERT(code == 0);
      ASSERT(pPreEntry != NULL);

      preTerm = pPreEntry->term;
      taosMemoryFree(pPreEntry);
1509
    }
M
Minghao Li 已提交
1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521

  } else {
    // no snapshot
    ASSERT(index > SYNC_INDEX_BEGIN);

    SSyncRaftEntry* pPreEntry = NULL;
    int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
    ASSERT(code == 0);
    ASSERT(pPreEntry != NULL);

    preTerm = pPreEntry->term;
    taosMemoryFree(pPreEntry);
1522
  }
M
Minghao Li 已提交
1523

M
Minghao Li 已提交
1524 1525
  return preTerm;
}
1526

M
Minghao Li 已提交
1527 1528 1529
// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1530
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1531 1532 1533
  return 0;
}

M
Minghao Li 已提交
1534 1535 1536 1537 1538
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
1539
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1540 1541 1542 1543 1544 1545
}

void syncNodePrint2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
1546
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1547 1548 1549 1550
}

void syncNodeLog(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
M
Minghao Li 已提交
1551
  sTraceLong("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
1552
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1553 1554 1555
}

void syncNodeLog2(char* s, SSyncNode* pObj) {
1556 1557 1558 1559 1560
  if (gRaftDetailLog) {
    char* serialized = syncNode2Str(pObj);
    sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
    taosMemoryFree(serialized);
  }
M
Minghao Li 已提交
1561 1562
}

M
Minghao Li 已提交
1563
// ------ local funciton ---------
M
Minghao Li 已提交
1564
// enqueue message ----
M
Minghao Li 已提交
1565 1566
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
1567
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
1568
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
M
Minghao Li 已提交
1569
                                              pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1570 1571
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1572
    syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
M
Minghao Li 已提交
1573
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1574
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1575 1576 1577
    } else {
      sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1578 1579
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1580
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
1581 1582
                 &pSyncNode->pPingTimer);
  } else {
1583
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
1584
           pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1585 1586 1587 1588 1589 1590 1591
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
M
Minghao Li 已提交
1592
                                              pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1593
    SRpcMsg      rpcMsg;
M
Minghao Li 已提交
1594
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1595
    syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
M
Minghao Li 已提交
1596
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1597
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1598 1599 1600
    } else {
      sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1601 1602
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1603
    // reset timer ms
M
Minghao Li 已提交
1604
    pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
1605 1606
    taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pElectTimer);
M
Minghao Li 已提交
1607
  } else {
1608
    sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
1609
           pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
1610 1611 1612
  }
}

M
Minghao Li 已提交
1613 1614 1615 1616 1617 1618
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
      atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
    SyncTimeout* pSyncMsg =
        syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
M
Minghao Li 已提交
1619
                          pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1620 1621
    SRpcMsg rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1622
    syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
M
Minghao Li 已提交
1623
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1624
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1625 1626 1627
    } else {
      sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1628 1629
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1630
    taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
1631 1632
                 &pSyncNode->pHeartbeatTimer);
  } else {
M
Minghao Li 已提交
1633 1634
    sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
           "",
M
Minghao Li 已提交
1635 1636 1637 1638
           pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  }
}

M
Minghao Li 已提交
1639 1640 1641 1642 1643 1644
static int32_t syncNodeEqNoop(SSyncNode* ths) {
  int32_t ret = 0;
  assert(ths->state == TAOS_SYNC_STATE_LEADER);

  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1645
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1646 1647 1648 1649 1650 1651 1652 1653
  assert(pEntry != NULL);

  uint32_t           entryLen;
  char*              serialized = syncEntrySerialize(pEntry, &entryLen);
  SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen);
  assert(pSyncMsg->dataLen == entryLen);
  memcpy(pSyncMsg->data, serialized, entryLen);

S
Shengliang Guan 已提交
1654
  SRpcMsg rpcMsg = {0};
M
Minghao Li 已提交
1655
  syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1656
  if (ths->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1657
    ths->FpEqMsg(ths->msgcb, &rpcMsg);
M
Minghao Li 已提交
1658 1659 1660
  } else {
    sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
  }
M
Minghao Li 已提交
1661

wafwerar's avatar
wafwerar 已提交
1662
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1663 1664 1665 1666 1667 1668 1669 1670 1671 1672
  syncClientRequestDestroy(pSyncMsg);

  return ret;
}

static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1673
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1674 1675 1676
  assert(pEntry != NULL);

  if (ths->state == TAOS_SYNC_STATE_LEADER) {
1677 1678
    // ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
    ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
M
Minghao Li 已提交
1679 1680 1681
    syncNodeReplicate(ths);
  }

M
Minghao Li 已提交
1682
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1683 1684 1685
  return ret;
}

M
Minghao Li 已提交
1686
// on message ----
M
Minghao Li 已提交
1687 1688
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
  // log state
1689
  char logBuf[1024] = {0};
M
Minghao Li 已提交
1690 1691 1692 1693 1694 1695
  snprintf(logBuf, sizeof(logBuf),
           "==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%lu electTimerLogicClock:%lu, "
           "electTimerLogicClockUser:%lu, electTimerMS:%d",
           ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm,
           ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS);

M
Minghao Li 已提交
1696
  int32_t ret = 0;
M
Minghao Li 已提交
1697 1698
  syncPingLog2(logBuf, pMsg);
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
M
Minghao Li 已提交
1699 1700
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
M
Minghao Li 已提交
1701 1702 1703 1704 1705 1706 1707 1708

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

M
Minghao Li 已提交
1709 1710 1711 1712 1713
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

  return ret;
}

M
Minghao Li 已提交
1714
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
1715 1716 1717 1718
  int32_t ret = 0;
  syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg);
  return ret;
}
M
Minghao Li 已提交
1719

M
Minghao Li 已提交
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
1730
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
M
Minghao Li 已提交
1731 1732 1733
  int32_t ret = 0;
  syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);

M
Minghao Li 已提交
1734
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
1735 1736 1737 1738
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
  assert(pEntry != NULL);

M
Minghao Li 已提交
1739
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
1740 1741
    // ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
    ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
M
Minghao Li 已提交
1742 1743

    // start replicate right now!
M
Minghao Li 已提交
1744
    syncNodeReplicate(ths);
M
Minghao Li 已提交
1745

M
Minghao Li 已提交
1746 1747 1748 1749
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
1750
    if (ths->pFsm != NULL) {
1751
      // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
M
Minghao Li 已提交
1752
      if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
1753 1754 1755 1756 1757 1758 1759
        SFsmCbMeta cbMeta;
        cbMeta.index = pEntry->index;
        cbMeta.isWeak = pEntry->isWeak;
        cbMeta.code = 0;
        cbMeta.state = ths->state;
        cbMeta.seqNum = pEntry->seqNum;
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
1760
      }
M
Minghao Li 已提交
1761 1762 1763
    }
    rpcFreeCont(rpcMsg.pCont);

M
Minghao Li 已提交
1764 1765 1766
    // only myself, maybe commit
    syncMaybeAdvanceCommitIndex(ths);

M
Minghao Li 已提交
1767
  } else {
M
Minghao Li 已提交
1768 1769 1770 1771
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
1772
    if (ths->pFsm != NULL) {
1773
      // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
M
Minghao Li 已提交
1774
      if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
1775 1776 1777 1778 1779 1780 1781
        SFsmCbMeta cbMeta;
        cbMeta.index = pEntry->index;
        cbMeta.isWeak = pEntry->isWeak;
        cbMeta.code = 1;
        cbMeta.state = ths->state;
        cbMeta.seqNum = pEntry->seqNum;
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
1782
      }
M
Minghao Li 已提交
1783 1784
    }
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1785 1786
  }

M
Minghao Li 已提交
1787
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1788
  return ret;
1789
}
M
Minghao Li 已提交
1790 1791 1792

static void syncFreeNode(void* param) {
  SSyncNode* pNode = param;
M
Minghao Li 已提交
1793 1794
  // inner object already free
  // syncNodePrint2((char*)"==syncFreeNode==", pNode);
M
Minghao Li 已提交
1795

wafwerar's avatar
wafwerar 已提交
1796
  taosMemoryFree(pNode);
M
Minghao Li 已提交
1797
}
S
Shengliang Guan 已提交
1798 1799 1800 1801

const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
1802
      return "follower";
S
Shengliang Guan 已提交
1803
    case TAOS_SYNC_STATE_CANDIDATE:
1804
      return "candidate";
S
Shengliang Guan 已提交
1805
    case TAOS_SYNC_STATE_LEADER:
1806
      return "leader";
S
Shengliang Guan 已提交
1807
    default:
1808
      return "error";
S
Shengliang Guan 已提交
1809
  }
M
Minghao Li 已提交
1810
}
1811

1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
  SyncLeaderTransfer* pSyncLeaderTransfer;
  if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) {
  }

  return 0;
}

static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
  SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;

  SSyncCfg newSyncCfg;
  int32_t  ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
  ASSERT(ret == 0);

  // update new config myIndex
1828
  bool IamInNew = false;
1829 1830 1831 1832
  for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
    if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
        ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
      newSyncCfg.myIndex = i;
1833
      IamInNew = true;
1834 1835 1836 1837 1838 1839
      break;
    }
  }

  bool isDrop;

1840
  if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878
    syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);

    // change isStandBy to normal
    if (!isDrop) {
      if (ths->state == TAOS_SYNC_STATE_LEADER) {
        syncNodeBecomeLeader(ths, "config change");
      } else {
        syncNodeBecomeFollower(ths, "config change");
      }
    }

    if (gRaftDetailLog) {
      char* sOld = syncCfg2Str(&oldSyncCfg);
      char* sNew = syncCfg2Str(&newSyncCfg);
      sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
      taosMemoryFree(sOld);
      taosMemoryFree(sNew);
    }
  }

  // always call FpReConfigCb
  if (ths->pFsm->FpReConfigCb != NULL) {
    SReConfigCbMeta cbMeta = {0};
    cbMeta.code = 0;
    cbMeta.currentTerm = ths->pRaftStore->currentTerm;
    cbMeta.index = pEntry->index;
    cbMeta.term = pEntry->term;
    cbMeta.newCfg = newSyncCfg;
    cbMeta.oldCfg = oldSyncCfg;
    cbMeta.seqNum = pEntry->seqNum;
    cbMeta.flag = 0x11;
    cbMeta.isDrop = isDrop;
    ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta);
  }

  return 0;
}

1879
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
1880 1881
  int32_t    code = 0;
  ESyncState state = flag;
1882 1883
  sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
        endIndex, syncUtilState2String(state));
1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
        code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
        ASSERT(code == 0);
        ASSERT(pEntry != NULL);

        SRpcMsg rpcMsg;
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

1897
        // user commit
1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912
        if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
          SFsmCbMeta cbMeta;
          cbMeta.index = pEntry->index;
          cbMeta.isWeak = pEntry->isWeak;
          cbMeta.code = 0;
          cbMeta.state = ths->state;
          cbMeta.seqNum = pEntry->seqNum;
          cbMeta.term = pEntry->term;
          cbMeta.currentTerm = ths->pRaftStore->currentTerm;
          cbMeta.flag = flag;

          ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
        }

        // config change
1913
        if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
1914 1915 1916 1917 1918 1919 1920 1921
          code = syncNodeConfigChange(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
        }

        // config change
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
1922 1923 1924 1925 1926 1927 1928 1929 1930
        }

        // restore finish
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
1931
            sInfo("sync event vgId:%d restore finish", ths->vgId);
1932 1933 1934 1935 1936 1937 1938 1939 1940
          }
        }

        rpcFreeCont(rpcMsg.pCont);
        syncEntryDestory(pEntry);
      }
    }
  }
  return 0;
1941 1942 1943 1944 1945 1946 1947 1948 1949
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
1950 1951 1952 1953 1954 1955 1956 1957 1958 1959
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
1960
}