syncMain.c 71.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

M
Minghao Li 已提交
16
#include "sync.h"
M
Minghao Li 已提交
17 18
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
19
#include "syncCommit.h"
M
Minghao Li 已提交
20
#include "syncElection.h"
M
Minghao Li 已提交
21
#include "syncEnv.h"
M
Minghao Li 已提交
22
#include "syncIndexMgr.h"
M
Minghao Li 已提交
23
#include "syncInt.h"
M
Minghao Li 已提交
24
#include "syncMessage.h"
M
Minghao Li 已提交
25
#include "syncRaftCfg.h"
M
Minghao Li 已提交
26
#include "syncRaftLog.h"
M
Minghao Li 已提交
27
#include "syncRaftStore.h"
M
Minghao Li 已提交
28
#include "syncReplication.h"
M
Minghao Li 已提交
29 30
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
31
#include "syncRespMgr.h"
M
Minghao Li 已提交
32
#include "syncSnapshot.h"
M
Minghao Li 已提交
33
#include "syncTimeout.h"
M
Minghao Li 已提交
34
#include "syncUtil.h"
M
Minghao Li 已提交
35
#include "syncVoteMgr.h"
M
Minghao Li 已提交
36
#include "tref.h"
M
Minghao Li 已提交
37

38
bool gRaftDetailLog = false;
39

M
Minghao Li 已提交
40 41 42
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
43
// enqueue message ----
M
Minghao Li 已提交
44 45 46 47 48
static void    syncNodeEqPingTimer(void* param, void* tmrId);
static void    syncNodeEqElectTimer(void* param, void* tmrId);
static void    syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
M
Minghao Li 已提交
49

M
Minghao Li 已提交
50
// process message ----
M
Minghao Li 已提交
51 52 53
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
M
Minghao Li 已提交
54 55 56

// life cycle
static void syncFreeNode(void* param);
M
Minghao Li 已提交
57 58 59
// ---------------------------------

int32_t syncInit() {
M
Minghao Li 已提交
60 61 62 63 64 65 66 67 68 69 70
  int32_t ret = 0;

  if (!syncEnvIsStart()) {
    tsNodeRefId = taosOpenRef(200, syncFreeNode);
    if (tsNodeRefId < 0) {
      sError("failed to init node ref");
      syncCleanUp();
      ret = -1;
    } else {
      ret = syncEnvStart();
    }
M
Minghao Li 已提交
71 72
  }

M
Minghao Li 已提交
73
  return ret;
M
Minghao Li 已提交
74
}
M
Minghao Li 已提交
75

M
Minghao Li 已提交
76 77 78
void syncCleanUp() {
  int32_t ret = syncEnvStop();
  assert(ret == 0);
M
Minghao Li 已提交
79 80 81 82 83

  if (tsNodeRefId != -1) {
    taosCloseRef(tsNodeRefId);
    tsNodeRefId = -1;
  }
M
Minghao Li 已提交
84
}
M
Minghao Li 已提交
85

M
Minghao Li 已提交
86
int64_t syncOpen(const SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
87 88
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
89

90 91 92
  if (gRaftDetailLog) {
    syncNodeLog2("syncNodeOpen open success", pSyncNode);
  }
M
Minghao Li 已提交
93

M
Minghao Li 已提交
94 95 96 97 98 99 100
  pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
  if (pSyncNode->rid < 0) {
    syncFreeNode(pSyncNode);
    return -1;
  }

  return pSyncNode->rid;
M
Minghao Li 已提交
101
}
M
Minghao Li 已提交
102

M
Minghao Li 已提交
103 104 105 106 107
void syncStart(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122

  if (pSyncNode->pRaftCfg->isStandBy) {
    syncNodeStartStandBy(pSyncNode);
  } else {
    syncNodeStart(pSyncNode);
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void syncStartNormal(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
123 124 125 126 127
  syncNodeStart(pSyncNode);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
128 129 130 131 132 133 134 135 136 137
void syncStartStandBy(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  syncNodeStartStandBy(pSyncNode);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
138
void syncStop(int64_t rid) {
M
Minghao Li 已提交
139 140 141 142
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
M
Minghao Li 已提交
143
  syncNodeClose(pSyncNode);
M
Minghao Li 已提交
144 145 146

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  taosRemoveRef(tsNodeRefId, rid);
M
Minghao Li 已提交
147
}
M
Minghao Li 已提交
148

M
Minghao Li 已提交
149 150 151
int32_t syncSetStandby(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
152
    return TAOS_SYNC_OTHER_ERROR;
M
Minghao Li 已提交
153 154
  }

155
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
156
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
M
Minghao Li 已提交
157
    return TAOS_SYNC_OTHER_ERROR;
M
Minghao Li 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
  }

  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);

  pSyncNode->pRaftCfg->isStandBy = 1;
  raftCfgPersist(pSyncNode->pRaftCfg);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
173
  sInfo("vgId:%d, set to standby", pSyncNode->vgId);
M
Minghao Li 已提交
174 175 176
  return 0;
}

M
Minghao Li 已提交
177 178 179 180 181 182 183
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_OTHER_ERROR;
  }
  ASSERT(rid == pSyncNode->rid);

M
Minghao Li 已提交
184
  int32_t ret = 0;
M
Minghao Li 已提交
185 186
  bool    IamInNew = false;
  for (int i = 0; i < pNewCfg->replicaNum; ++i) {
M
Minghao Li 已提交
187 188
    if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
M
Minghao Li 已提交
189 190
      IamInNew = true;
    }
M
Minghao Li 已提交
191 192 193 194 195 196 197 198 199

    /*
        SRaftId newId;
        newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
        newId.vgId = pSyncNode->vgId;
        if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
          IamInNew = true;
        }
    */
M
Minghao Li 已提交
200
  }
M
Minghao Li 已提交
201

M
Minghao Li 已提交
202 203 204 205 206 207 208 209 210 211 212 213
  if (!IamInNew) {
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
    return TAOS_SYNC_NOT_IN_NEW_CONFIG;
  }

  char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
  pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE;
  pRpcMsg->info.noResp = 1;
  pRpcMsg->contLen = strlen(newconfig) + 1;
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
  snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
  taosMemoryFree(newconfig);
214

M
Minghao Li 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227
  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_OTHER_ERROR;
  }
  ASSERT(rid == pSyncNode->rid);

  bool IamInNew = false;
  for (int i = 0; i < pNewCfg->replicaNum; ++i) {
228 229
    if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
M
Minghao Li 已提交
230 231
      IamInNew = true;
    }
232 233 234 235 236 237 238 239 240 241 242 243

    /*
        // some problem in inet_addr

        SRaftId newId = EMPTY_RAFT_ID;
        newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
        newId.vgId = pSyncNode->vgId;

        if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
          IamInNew = true;
        }
      */
M
Minghao Li 已提交
244
  }
M
Minghao Li 已提交
245

M
Minghao Li 已提交
246
  if (!IamInNew) {
M
Minghao Li 已提交
247
    sError("sync reconfig error, not in new config");
M
Minghao Li 已提交
248 249 250
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
    return TAOS_SYNC_NOT_IN_NEW_CONFIG;
  }
251

M
Minghao Li 已提交
252
  char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
253
  if (gRaftDetailLog) {
254
    sInfo("==syncReconfig== newconfig:%s", newconfig);
255
  }
M
Minghao Li 已提交
256

M
Minghao Li 已提交
257 258
  int32_t ret = 0;

M
Minghao Li 已提交
259
  SRpcMsg rpcMsg = {0};
260
  rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
S
Shengliang Guan 已提交
261
  rpcMsg.info.noResp = 1;
262
  rpcMsg.contLen = strlen(newconfig) + 1;
M
Minghao Li 已提交
263
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
264 265
  snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
  taosMemoryFree(newconfig);
M
Minghao Li 已提交
266 267 268
  ret = syncNodePropose(pSyncNode, &rpcMsg, false);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
M
Minghao Li 已提交
269 270
  return ret;
}
M
Minghao Li 已提交
271

272
int32_t syncLeaderTransfer(int64_t rid) {
M
Minghao Li 已提交
273 274 275 276 277 278
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_OTHER_ERROR;
  }
  ASSERT(rid == pSyncNode->rid);

M
Minghao Li 已提交
279
  if (pSyncNode->peersNum == 0) {
M
Minghao Li 已提交
280 281 282 283 284 285
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
    return TAOS_SYNC_OTHER_ERROR;
  }

  SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
286

M
Minghao Li 已提交
287
  int32_t ret = syncLeaderTransferTo(rid, newLeader);
288 289 290 291 292 293
  return ret;
}

int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
294
    return TAOS_SYNC_OTHER_ERROR;
295
  }
M
Minghao Li 已提交
296
  ASSERT(rid == pSyncNode->rid);
297 298 299 300
  int32_t ret = 0;

  if (pSyncNode->replicaNum == 1) {
    sError("only one replica, cannot drop leader");
M
Minghao Li 已提交
301
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
302 303 304 305 306 307
    return TAOS_SYNC_ONLY_ONE_REPLICA;
  }

  SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
  pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
  pMsg->newLeaderId.vgId = pSyncNode->vgId;
M
Minghao Li 已提交
308
  pMsg->newNodeInfo = newLeader;
309 310 311 312 313
  ASSERT(pMsg != NULL);
  SRpcMsg rpcMsg = {0};
  syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
  syncLeaderTransferDestroy(pMsg);

M
Minghao Li 已提交
314
  ret = syncNodePropose(pSyncNode, &rpcMsg, false);
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

bool syncCanLeaderTransfer(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return false;
  }
  assert(rid == pSyncNode->rid);

  if (pSyncNode->replicaNum == 1) {
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
    return false;
  }

  if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
    taosReleaseRef(tsNodeRefId, pSyncNode->rid);
    return true;
  }

  bool matchOK = true;
  if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE || pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SyncIndex myCommitIndex = pSyncNode->commitIndex;
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
      if (peerMatchIndex < myCommitIndex) {
        matchOK = false;
      }
    }
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return matchOK;
}

M
Minghao Li 已提交
351 352 353 354
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
  int32_t ret = syncPropose(rid, pMsg, isWeak);
  return ret;
}
M
Minghao Li 已提交
355

M
Minghao Li 已提交
356 357 358 359 360 361
ESyncState syncGetMyRole(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
M
Minghao Li 已提交
362 363 364 365
  ESyncState state = pSyncNode->state;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return state;
M
Minghao Li 已提交
366 367
}

M
Minghao Li 已提交
368 369 370 371 372 373 374 375 376 377 378 379
bool syncIsRestoreFinish(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return false;
  }
  assert(rid == pSyncNode->rid);
  bool b = pSyncNode->restoreFinish;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return b;
}

380 381 382 383 384 385
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return -1;
  }
  assert(rid == pSyncNode->rid);
386 387 388
  sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;

  sTrace("sync get snapshot meta: lastConfigIndex:%ld", pSyncNode->pRaftCfg->lastConfigIndex);
389 390 391 392 393

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return 0;
}

M
Minghao Li 已提交
394 395 396 397 398
const char* syncGetMyRoleStr(int64_t rid) {
  const char* s = syncUtilState2String(syncGetMyRole(rid));
  return s;
}

M
Minghao Li 已提交
399 400 401 402 403 404 405 406 407 408 409 410
int32_t syncGetVgId(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
  int32_t vgId = pSyncNode->vgId;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return vgId;
}

M
Minghao Li 已提交
411
SyncTerm syncGetMyTerm(int64_t rid) {
M
Minghao Li 已提交
412 413
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
414 415 416 417 418 419 420 421 422
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);
  SyncTerm term = pSyncNode->pRaftStore->currentTerm;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return term;
}

M
Minghao Li 已提交
423 424 425 426 427 428 429 430 431
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    memset(pEpSet, 0, sizeof(*pEpSet));
    return;
  }
  assert(rid == pSyncNode->rid);
  pEpSet->numOfEps = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
M
Minghao Li 已提交
432 433
    snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
    pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
M
Minghao Li 已提交
434
    (pEpSet->numOfEps)++;
M
Minghao Li 已提交
435 436

    sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
M
Minghao Li 已提交
437 438
  }
  pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
M
Minghao Li 已提交
439

M
Minghao Li 已提交
440
  sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
M
Minghao Li 已提交
441 442 443 444

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

M
Minghao Li 已提交
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);

  SRespStub stub;
  int32_t   ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
    memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
  }

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

S
Shengliang Guan 已提交
462
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) {
M
Minghao Li 已提交
463 464 465 466 467 468 469 470 471
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return TAOS_SYNC_STATE_ERROR;
  }
  assert(rid == pSyncNode->rid);

  SRespStub stub;
  int32_t   ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
  if (ret == 1) {
S
Shengliang Guan 已提交
472
    *pInfo = stub.rpcMsg.info;
M
Minghao Li 已提交
473 474
  }

S
Shengliang Guan 已提交
475
  sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
M
Minghao Li 已提交
476 477 478 479
  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

480
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
M
Minghao Li 已提交
481 482 483 484 485 486
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
    return;
  }
  assert(rid == pSyncNode->rid);
S
Shengliang Guan 已提交
487
  pSyncNode->msgcb = msgcb;
M
Minghao Li 已提交
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

char* sync2SimpleStr(int64_t rid) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid);
    return NULL;
  }
  assert(rid == pSyncNode->rid);
  char* s = syncNode2SimpleStr(pSyncNode);
  taosReleaseRef(tsNodeRefId, pSyncNode->rid);

  return s;
}

void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->pingBaseLine = pingTimerMS;
  pSyncNode->pingTimerMS = pingTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->electBaseLine = electTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
  SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
  if (pSyncNode == NULL) {
    return;
  }
  assert(rid == pSyncNode->rid);
  pSyncNode->hbBaseLine = hbTimerMS;
  pSyncNode->heartbeatTimerMS = hbTimerMS;

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}

int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
541
  int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
M
Minghao Li 已提交
542

543
  SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
544
  if (pSyncNode == NULL) {
M
Minghao Li 已提交
545
    return TAOS_SYNC_OTHER_ERROR;
546
  }
M
Minghao Li 已提交
547
  assert(rid == pSyncNode->rid);
M
Minghao Li 已提交
548
  sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
549 550 551 552 553 554 555 556 557
  ret = syncNodePropose(pSyncNode, pMsg, isWeak);

  taosReleaseRef(tsNodeRefId, pSyncNode->rid);
  return ret;
}

int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
  int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
  sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
558

M
Minghao Li 已提交
559
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
M
Minghao Li 已提交
560 561 562 563 564 565
    SRespStub stub;
    stub.createTime = taosGetTimestampMs();
    stub.rpcMsg = *pMsg;
    uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);

    SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
M
Minghao Li 已提交
566 567
    SRpcMsg            rpcMsg;
    syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
568 569 570

    if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
      ret = TAOS_SYNC_PROPOSE_SUCCESS;
M
Minghao Li 已提交
571
    } else {
M
Minghao Li 已提交
572
      sError("syncPropose pSyncNode->FpEqMsg is NULL");
M
Minghao Li 已提交
573
    }
M
Minghao Li 已提交
574 575
    syncClientRequestDestroy(pSyncMsg);
  } else {
M
Minghao Li 已提交
576
    sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
577
    ret = TAOS_SYNC_PROPOSE_NOT_LEADER;
M
Minghao Li 已提交
578
  }
M
Minghao Li 已提交
579 580 581 582

  return ret;
}

M
Minghao Li 已提交
583
// open/close --------------
584 585 586
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
  SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;

587
  sDebug("vgId:%d sync event sync open", pSyncInfo->vgId);
588

wafwerar's avatar
wafwerar 已提交
589
  SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
M
Minghao Li 已提交
590
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
591
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
592

M
Minghao Li 已提交
593 594 595 596 597 598 599
  int32_t ret = 0;
  if (!taosDirExist((char*)(pSyncInfo->path))) {
    if (taosMkDir(pSyncInfo->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
      return NULL;
    }
600
  }
M
Minghao Li 已提交
601

602 603
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
  if (!taosCheckExistFile(pSyncNode->configPath)) {
M
Minghao Li 已提交
604 605 606 607
    // create a new raft config file
    SRaftCfgMeta meta;
    meta.isStandBy = pSyncInfo->isStandBy;
    meta.snapshotEnable = pSyncInfo->snapshotEnable;
608
    meta.lastConfigIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
609
    ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
M
Minghao Li 已提交
610
    assert(ret == 0);
611 612 613 614 615 616 617

  } else {
    // update syncCfg by raft_config.json
    pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
    assert(pSyncNode->pRaftCfg != NULL);
    pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;

618 619 620 621 622
    if (gRaftDetailLog) {
      char* seralized = raftCfg2Str(pSyncNode->pRaftCfg);
      sInfo("syncNodeOpen update config :%s", seralized);
      taosMemoryFree(seralized);
    }
623 624

    raftCfgClose(pSyncNode->pRaftCfg);
M
Minghao Li 已提交
625 626
  }

M
Minghao Li 已提交
627
  // init by SSyncInfo
M
Minghao Li 已提交
628 629
  pSyncNode->vgId = pSyncInfo->vgId;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
M
Minghao Li 已提交
630
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path);
M
Minghao Li 已提交
631 632
  snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);

M
Minghao Li 已提交
633
  pSyncNode->pWal = pSyncInfo->pWal;
S
Shengliang Guan 已提交
634
  pSyncNode->msgcb = pSyncInfo->msgcb;
M
Minghao Li 已提交
635
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
636
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
637

M
Minghao Li 已提交
638 639 640 641
  // init raft config
  pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
  assert(pSyncNode->pRaftCfg != NULL);

M
Minghao Li 已提交
642
  // init internal
M
Minghao Li 已提交
643 644
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
645

M
Minghao Li 已提交
646
  // init peersNum, peers, peersId
M
Minghao Li 已提交
647
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
M
Minghao Li 已提交
648
  int j = 0;
M
Minghao Li 已提交
649 650 651
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
M
Minghao Li 已提交
652 653 654
      j++;
    }
  }
M
Minghao Li 已提交
655
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
656
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
657
  }
M
Minghao Li 已提交
658

M
Minghao Li 已提交
659
  // init replicaNum, replicasId
M
Minghao Li 已提交
660 661 662
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
M
Minghao Li 已提交
663 664
  }

M
Minghao Li 已提交
665
  // init raft algorithm
M
Minghao Li 已提交
666
  pSyncNode->pFsm = pSyncInfo->pFsm;
M
Minghao Li 已提交
667
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
M
Minghao Li 已提交
668 669
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
670
  // init life cycle outside
M
Minghao Li 已提交
671

M
Minghao Li 已提交
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
696
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
697
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
698
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
M
Minghao Li 已提交
699 700
  assert(pSyncNode->pRaftStore != NULL);

M
Minghao Li 已提交
701
  // init TLA+ candidate vars
M
Minghao Li 已提交
702 703 704 705 706
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
  assert(pSyncNode->pVotesGranted != NULL);
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
  assert(pSyncNode->pVotesRespond != NULL);

M
Minghao Li 已提交
707 708 709 710 711 712 713 714 715
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pNextIndex != NULL);
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pMatchIndex != NULL);

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
  assert(pSyncNode->pLogStore != NULL);
M
Minghao Li 已提交
716
  pSyncNode->commitIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
717

M
Minghao Li 已提交
718 719 720 721 722
  // timer ms init
  pSyncNode->pingBaseLine = PING_TIMER_MS;
  pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
  pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS;

M
Minghao Li 已提交
723
  // init ping timer
M
Minghao Li 已提交
724
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
725
  pSyncNode->pingTimerMS = pSyncNode->pingBaseLine;
M
Minghao Li 已提交
726 727
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
728
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
729
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
730

M
Minghao Li 已提交
731 732
  // init elect timer
  pSyncNode->pElectTimer = NULL;
M
Minghao Li 已提交
733
  pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
M
Minghao Li 已提交
734 735
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
M
Minghao Li 已提交
736
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
737 738 739 740
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
M
Minghao Li 已提交
741
  pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
M
Minghao Li 已提交
742 743
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
744
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
745 746
  pSyncNode->heartbeatTimerCounter = 0;

M
Minghao Li 已提交
747
  // init callback
M
Minghao Li 已提交
748 749
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
M
Minghao Li 已提交
750
  pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb;
M
Minghao Li 已提交
751
  pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
M
Minghao Li 已提交
752

M
Minghao Li 已提交
753 754 755
  pSyncNode->FpOnSnapshotSend = syncNodeOnSnapshotSendCb;
  pSyncNode->FpOnSnapshotRsp = syncNodeOnSnapshotRspCb;

M
Minghao Li 已提交
756
  if (pSyncNode->pRaftCfg->snapshotEnable) {
M
Minghao Li 已提交
757
    sInfo("sync node use snapshot");
M
Minghao Li 已提交
758 759 760 761
    pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteSnapshotCb;
    pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplySnapshotCb;
    pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesSnapshotCb;
    pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplySnapshotCb;
M
Minghao Li 已提交
762 763 764 765 766 767 768

  } else {
    sInfo("sync node do not use snapshot");
    pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
    pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
    pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
    pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
769 770
  }

M
Minghao Li 已提交
771
  // tools
M
Minghao Li 已提交
772
  pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, 0);
M
Minghao Li 已提交
773 774
  assert(pSyncNode->pSyncRespMgr != NULL);

775 776
  // restore state
  pSyncNode->restoreFinish = false;
777 778 779 780 781 782

  // pSyncNode->pSnapshot = NULL;
  // if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
  //   pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
  //   pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
  // }
783
  // tsem_init(&(pSyncNode->restoreSem), 0, 0);
784

M
Minghao Li 已提交
785 786 787 788 789 790 791 792
  // snapshot senders
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
    // ASSERT(pSender != NULL);
    (pSyncNode->senders)[i] = pSender;
  }

  // snapshot receivers
793
  pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
M
Minghao Li 已提交
794

M
Minghao Li 已提交
795
  // start in syncNodeStart
M
Minghao Li 已提交
796
  // start raft
M
Minghao Li 已提交
797
  // syncNodeBecomeFollower(pSyncNode);
M
Minghao Li 已提交
798

799
  // snapshot meta
800
  // pSyncNode->sMeta.lastConfigIndex = -1;
801

M
Minghao Li 已提交
802 803 804
  return pSyncNode;
}

M
Minghao Li 已提交
805 806
void syncNodeStart(SSyncNode* pSyncNode) {
  // start raft
807
  if (pSyncNode->replicaNum == 1) {
M
Minghao Li 已提交
808
    raftStoreNextTerm(pSyncNode->pRaftStore);
809
    syncNodeBecomeLeader(pSyncNode, "one replica start");
M
format  
Minghao Li 已提交
810

811
    // Raft 3.6.2 Committing entries from previous terms
M
format  
Minghao Li 已提交
812

813 814 815
    // use this now
    syncNodeAppendNoop(pSyncNode);
    syncMaybeAdvanceCommitIndex(pSyncNode);  // maybe only one replica
816

817 818
    if (gRaftDetailLog) {
      syncNodeLog2("==state change become leader immediately==", pSyncNode);
819 820
    }

821 822 823
    return;
  }

824
  syncNodeBecomeFollower(pSyncNode, "first start");
M
Minghao Li 已提交
825

826
  // int32_t ret = 0;
M
Minghao Li 已提交
827
  // ret = syncNodeStartPingTimer(pSyncNode);
828
  // assert(ret == 0);
829

830 831
  if (gRaftDetailLog) {
    syncNodeLog2("==state change become leader immediately==", pSyncNode);
832
  }
M
Minghao Li 已提交
833 834
}

M
Minghao Li 已提交
835 836 837 838 839 840 841 842 843 844 845
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
  // state change
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

  // reset elect timer, long enough
  int32_t electMS = TIMER_MAX_MS;
  int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  ASSERT(ret == 0);
}

M
Minghao Li 已提交
846
void syncNodeClose(SSyncNode* pSyncNode) {
847
  sDebug("vgId:%d sync event sync close", pSyncNode->vgId);
848

M
Minghao Li 已提交
849
  int32_t ret;
M
Minghao Li 已提交
850
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
851 852 853 854

  ret = raftStoreClose(pSyncNode->pRaftStore);
  assert(ret == 0);

M
Minghao Li 已提交
855
  syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
M
Minghao Li 已提交
856 857 858 859 860
  voteGrantedDestroy(pSyncNode->pVotesGranted);
  votesRespondDestory(pSyncNode->pVotesRespond);
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
  logStoreDestory(pSyncNode->pLogStore);
M
Minghao Li 已提交
861
  raftCfgClose(pSyncNode->pRaftCfg);
M
Minghao Li 已提交
862 863 864 865 866

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
867 868 869 870
  if (pSyncNode->pFsm != NULL) {
    taosMemoryFree(pSyncNode->pFsm);
  }

M
Minghao Li 已提交
871 872 873 874 875 876 877
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    if ((pSyncNode->senders)[i] != NULL) {
      snapshotSenderDestroy((pSyncNode->senders)[i]);
      (pSyncNode->senders)[i] = NULL;
    }
  }

M
Minghao Li 已提交
878 879 880 881 882
  if (pSyncNode->pNewNodeReceiver != NULL) {
    snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
    pSyncNode->pNewNodeReceiver = NULL;
  }

883
  /*
884 885 886
  if (pSyncNode->pSnapshot != NULL) {
    taosMemoryFree(pSyncNode->pSnapshot);
  }
887
  */
888

889
  // tsem_destroy(&pSyncNode->restoreSem);
890

M
Minghao Li 已提交
891 892
  // free memory in syncFreeNode
  // taosMemoryFree(pSyncNode);
M
Minghao Li 已提交
893 894
}

M
Minghao Li 已提交
895 896 897
// option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }

M
Minghao Li 已提交
898 899 900 901 902 903 904 905 906 907 908 909 910 911 912
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
  syncPingLog2((char*)"==syncNodePing==", pMsg);
  int32_t ret = 0;

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);

  ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
}

int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
M
Minghao Li 已提交
913
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId, pSyncNode->vgId);
M
Minghao Li 已提交
914 915 916 917 918 919 920 921 922 923
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
  assert(ret == 0);

  syncPingDestroy(pMsg);
  return ret;
}

int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
924 925 926
    SRaftId*  destId = &(pSyncNode->peersId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
927 928 929 930 931 932 933 934
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

int32_t syncNodePingAll(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
935 936 937 938
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    SRaftId*  destId = &(pSyncNode->replicasId[i]);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
    ret = syncNodePing(pSyncNode, destId, pMsg);
M
Minghao Li 已提交
939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pPingTimer);
  atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  pSyncNode->electTimerMS = ms;
  taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pElectTimer);
  atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
986 987
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
M
Minghao Li 已提交
988 989 990 991 992 993 994
  int32_t electMS;

  if (pSyncNode->pRaftCfg->isStandBy) {
    electMS = TIMER_MAX_MS;
  } else {
    electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
  }
M
Minghao Li 已提交
995 996 997 998
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  return ret;
}

M
Minghao Li 已提交
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pHeartbeatTimer);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
  return ret;
}

// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
M
Minghao Li 已提交
1019
  if (pSyncNode->FpSendMsg != NULL) {
M
Minghao Li 已提交
1020 1021 1022 1023 1024 1025
    if (gRaftDetailLog) {
      char* JsonStr = syncRpcMsg2Str(pMsg);
      syncUtilJson2Line(JsonStr);
      sTrace("sync send msg, vgId:%d, type:%d, msg:%s", pSyncNode->vgId, pMsg->msgType, JsonStr);
      taosMemoryFree(JsonStr);
    }
M
Minghao Li 已提交
1026

M
Minghao Li 已提交
1027 1028 1029
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1030
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1031
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
1032 1033 1034
  } else {
    sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
  }
M
Minghao Li 已提交
1035 1036 1037 1038 1039 1040
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
M
Minghao Li 已提交
1041 1042 1043 1044
  if (pSyncNode->FpSendMsg != NULL) {
    // htonl
    syncUtilMsgHtoN(pMsg->pCont);

1045
    pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
1046
    pSyncNode->FpSendMsg(&epSet, pMsg);
M
Minghao Li 已提交
1047 1048 1049
  } else {
    sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
  }
M
Minghao Li 已提交
1050 1051 1052
  return 0;
}

M
Minghao Li 已提交
1053
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
C
Cary Xu 已提交
1054
  char   u64buf[128] = {0};
M
Minghao Li 已提交
1055 1056
  cJSON* pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
1057 1058 1059
  if (pSyncNode != NULL) {
    // init by SSyncInfo
    cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
M
Minghao Li 已提交
1060
    cJSON_AddItemToObject(pRoot, "SRaftCfg", raftCfg2Json(pSyncNode->pRaftCfg));
M
Minghao Li 已提交
1061
    cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
M
Minghao Li 已提交
1062 1063 1064
    cJSON_AddStringToObject(pRoot, "raftStorePath", pSyncNode->raftStorePath);
    cJSON_AddStringToObject(pRoot, "configPath", pSyncNode->configPath);

M
Minghao Li 已提交
1065 1066 1067
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);

S
Shengliang Guan 已提交
1068
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
1069 1070 1071 1072
    cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
    cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);

S
Shengliang Guan 已提交
1073
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
M
Minghao Li 已提交
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094
    cJSON_AddStringToObject(pRoot, "queue", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
    cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);

    // init internal
    cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
    cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
    cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
    cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);

    cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
    cJSON* pPeers = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
    }
    cJSON* pPeersId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
    }
M
Minghao Li 已提交
1095

M
Minghao Li 已提交
1096 1097 1098 1099 1100 1101
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
    cJSON* pReplicasId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
    }
M
Minghao Li 已提交
1102

M
Minghao Li 已提交
1103 1104 1105 1106 1107 1108 1109
    // raft algorithm
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
    cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
    cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
    cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);

M
Minghao Li 已提交
1110 1111 1112 1113
    // life cycle
    snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->rid);
    cJSON_AddStringToObject(pRoot, "rid", u64buf);

M
Minghao Li 已提交
1114 1115 1116
    // tla+ server vars
    cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
    cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
1117
    cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore));
M
Minghao Li 已提交
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128

    // tla+ candidate vars
    cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
    cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));

    // tla+ leader vars
    cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
    cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));

    // tla+ log vars
    cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
M
Minghao Li 已提交
1129
    snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex);
M
Minghao Li 已提交
1130 1131
    cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);

M
Minghao Li 已提交
1132 1133 1134 1135 1136
    // timer ms init
    cJSON_AddNumberToObject(pRoot, "pingBaseLine", pSyncNode->pingBaseLine);
    cJSON_AddNumberToObject(pRoot, "electBaseLine", pSyncNode->electBaseLine);
    cJSON_AddNumberToObject(pRoot, "hbBaseLine", pSyncNode->hbBaseLine);

M
Minghao Li 已提交
1137 1138 1139 1140
    // ping timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
    cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
M
Minghao Li 已提交
1141
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock);
M
Minghao Li 已提交
1142
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
M
Minghao Li 已提交
1143
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1144 1145 1146
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
    cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
M
Minghao Li 已提交
1147
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter);
M
Minghao Li 已提交
1148 1149 1150 1151 1152 1153
    cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);

    // elect timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
    cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
M
Minghao Li 已提交
1154
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock);
M
Minghao Li 已提交
1155
    cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
M
Minghao Li 已提交
1156
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
1157 1158 1159
    cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
    cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
M
Minghao Li 已提交
1160
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter);
M
Minghao Li 已提交
1161 1162 1163 1164 1165 1166
    cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);

    // heartbeat timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
    cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
1167
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock);
M
Minghao Li 已提交
1168
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
M
Minghao Li 已提交
1169
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser);
M
Minghao Li 已提交
1170 1171 1172
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
    cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
M
Minghao Li 已提交
1173
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter);
M
Minghao Li 已提交
1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
    cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);

    // callback
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
    cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
    cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
    cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
M
Minghao Li 已提交
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203

    // restoreFinish
    cJSON_AddNumberToObject(pRoot, "restoreFinish", pSyncNode->restoreFinish);

    // snapshot senders
    cJSON* pSenders = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "senders", pSenders);
    for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
      cJSON_AddItemToArray(pSenders, snapshotSender2Json((pSyncNode->senders)[i]));
    }

    // snapshot receivers
    cJSON* pReceivers = cJSON_CreateArray();
1204
    cJSON_AddItemToObject(pRoot, "receiver", snapshotReceiver2Json(pSyncNode->pNewNodeReceiver));
M
Minghao Li 已提交
1205 1206 1207 1208 1209 1210 1211
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
  return pJson;
}

M
Minghao Li 已提交
1212 1213 1214 1215 1216 1217 1218
char* syncNode2Str(const SSyncNode* pSyncNode) {
  cJSON* pJson = syncNode2Json(pSyncNode);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
1219 1220 1221 1222
char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
  int   len = 256;
  char* s = (char*)taosMemoryMalloc(len);
  snprintf(s, len,
M
Minghao Li 已提交
1223
           "syncNode: vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, "
1224
           "electTimerLogicClock:%lu, "
M
Minghao Li 已提交
1225
           "electTimerLogicClockUser:%lu, "
M
Minghao Li 已提交
1226
           "electTimerMS:%d, replicaNum:%d",
M
Minghao Li 已提交
1227
           pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->state,
1228
           syncUtilState2String(pSyncNode->state), pSyncNode->pRaftCfg->isStandBy, pSyncNode->electTimerLogicClock,
M
Minghao Li 已提交
1229
           pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, pSyncNode->replicaNum);
M
Minghao Li 已提交
1230 1231 1232
  return s;
}

1233
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) {
1234
  SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
M
Minghao Li 已提交
1235
  pSyncNode->pRaftCfg->cfg = *newConfig;
1236 1237
  pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;

M
Minghao Li 已提交
1238
  int32_t ret = 0;
M
Minghao Li 已提交
1239

1240 1241 1242 1243
  // save snapshot senders
  int32_t oldReplicaNum = pSyncNode->replicaNum;
  SRaftId oldReplicasId[TSDB_MAX_REPLICA];
  memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
1244
  SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
1245 1246 1247 1248 1249 1250 1251
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    oldSenders[i] = (pSyncNode->senders)[i];
    sDebug("vgId:%d sync event save senders %d, %p", pSyncNode->vgId, i, oldSenders[i]);
    if (gRaftDetailLog) {
      ;
    }
  }
1252

M
Minghao Li 已提交
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274
  // init internal
  pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);

  // init peersNum, peers, peersId
  pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
  int j = 0;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
      pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
      j++;
    }
  }
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
  }

  // init replicaNum, replicasId
  pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
  for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
  }
M
Minghao Li 已提交
1275 1276 1277

  syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
  syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
M
Minghao Li 已提交
1278 1279
  voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
  votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
M
Minghao Li 已提交
1280

1281 1282
  pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);

1283 1284 1285
  // reset snapshot senders

  // clear new
1286 1287 1288
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    (pSyncNode->senders)[i] = NULL;
  }
1289 1290

  // reset new
1291
  for (int i = 0; i < pSyncNode->replicaNum; ++i) {
1292
    // reset sender
1293
    bool reset = false;
1294 1295
    for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
      if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
1296
        char     host[128];
1297 1298
        uint16_t port;
        syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
1299 1300
        sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p", pSyncNode->vgId,
               (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
1301
        (pSyncNode->senders)[i] = oldSenders[j];
1302
        oldSenders[j] = NULL;
1303
        reset = true;
1304

1305 1306 1307 1308 1309
        // reset replicaIndex
        int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
        (pSyncNode->senders)[i]->replicaIndex = i;
        sDebug("vgId:%d sync event udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", pSyncNode->vgId,
               oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
1310 1311 1312
      }
    }
  }
1313 1314

  // create new
1315 1316 1317
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    if ((pSyncNode->senders)[i] == NULL) {
      (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
1318
      sDebug("vgId:%d sync event create new sender %p replicaIndex:%d", pSyncNode->vgId, (pSyncNode->senders)[i], i);
1319 1320 1321 1322 1323 1324 1325
    }
  }

  // free old
  for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
    if (oldSenders[i] != NULL) {
      snapshotSenderDestroy(oldSenders[i]);
1326
      sDebug("vgId:%d sync event delete old sender %p replicaIndex:%d", pSyncNode->vgId, oldSenders[i], i);
1327
      oldSenders[i] = NULL;
1328 1329 1330
    }
  }

1331 1332
  bool IamInOld = false;
  bool IamInNew = false;
1333 1334 1335
  for (int i = 0; i < oldConfig.replicaNum; ++i) {
    if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
1336
      IamInOld = true;
1337 1338 1339 1340
      break;
    }
  }

M
Minghao Li 已提交
1341 1342 1343
  for (int i = 0; i < newConfig->replicaNum; ++i) {
    if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
        (newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
1344
      IamInNew = true;
M
Minghao Li 已提交
1345 1346 1347
      break;
    }
  }
M
Minghao Li 已提交
1348

1349 1350 1351 1352 1353
  *isDrop = true;
  if (IamInOld && !IamInNew) {
    *isDrop = true;
  } else {
    *isDrop = false;
M
Minghao Li 已提交
1354 1355
  }

1356 1357 1358
  if (IamInNew) {
    pSyncNode->pRaftCfg->isStandBy = 0;  // change isStandBy to normal
  }
M
Minghao Li 已提交
1359
  raftCfgPersist(pSyncNode->pRaftCfg);
1360 1361 1362 1363

  if (gRaftDetailLog) {
    syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
  }
M
Minghao Li 已提交
1364 1365
}

M
Minghao Li 已提交
1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
SSyncNode* syncNodeAcquire(int64_t rid) {
  SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid);
  if (pNode == NULL) {
    sTrace("failed to acquire node from refId:%" PRId64, rid);
  }

  return pNode;
}

void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid); }

M
Minghao Li 已提交
1377 1378 1379 1380
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
1381
    syncNodeBecomeFollower(pSyncNode, "update term");
M
Minghao Li 已提交
1382 1383 1384 1385
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

1386
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
1387 1388
  sDebug("vgId:%d sync event become follower, isStandBy:%d, replicaNum:%d, %s", pSyncNode->vgId,
         pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr);
M
Minghao Li 已提交
1389

M
Minghao Li 已提交
1390
  // maybe clear leader cache
M
Minghao Li 已提交
1391 1392 1393 1394
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
1395
  // state change
M
Minghao Li 已提交
1396 1397 1398
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
1399 1400
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
1421
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
1422 1423
  sDebug("vgId:%d sync event become leader, isStandBy:%d, replicaNum:%d %s", pSyncNode->vgId,
         pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr);
M
Minghao Li 已提交
1424

M
Minghao Li 已提交
1425
  // state change
M
Minghao Li 已提交
1426
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
1427 1428

  // set leader cache
M
Minghao Li 已提交
1429 1430 1431
  pSyncNode->leaderCache = pSyncNode->myRaftId;

  for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1432 1433
    // maybe overwrite myself, no harm
    // just do it!
1434 1435 1436 1437 1438 1439 1440 1441 1442

    // pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;

    // maybe wal is deleted
    SyncIndex lastIndex;
    SyncTerm  lastTerm;
    int32_t   code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
    ASSERT(code == 0);
    pSyncNode->pNextIndex->index[i] = lastIndex + 1;
M
Minghao Li 已提交
1443 1444 1445
  }

  for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
1446 1447
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
1448 1449 1450
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

1451 1452
  // update sender private term
  SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
1453 1454 1455 1456 1457
  if (pMySender != NULL) {
    for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
      if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
        pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
      }
1458
    }
1459
    (pMySender->privateTerm) += 100;
1460 1461
  }

M
Minghao Li 已提交
1462
  // stop elect timer
M
Minghao Li 已提交
1463
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
1464 1465

  // start replicate right now!
M
Minghao Li 已提交
1466
  syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
1467 1468 1469

  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
1470 1471 1472 1473 1474
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  assert(voteGrantedMajority(pSyncNode->pVotesGranted));
1475
  syncNodeBecomeLeader(pSyncNode, "candidate to leader");
M
Minghao Li 已提交
1476

M
Minghao Li 已提交
1477 1478
  syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);

M
Minghao Li 已提交
1479
  // Raft 3.6.2 Committing entries from previous terms
M
Minghao Li 已提交
1480 1481

  // use this now
M
Minghao Li 已提交
1482
  syncNodeAppendNoop(pSyncNode);
1483
  syncMaybeAdvanceCommitIndex(pSyncNode);  // maybe only one replica
M
Minghao Li 已提交
1484 1485

  // do not use this
M
Minghao Li 已提交
1486
  // syncNodeEqNoop(pSyncNode);
M
Minghao Li 已提交
1487 1488 1489 1490 1491
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
M
Minghao Li 已提交
1492 1493

  syncNodeLog2("==state change syncNodeFollower2Candidate==", pSyncNode);
M
Minghao Li 已提交
1494 1495 1496 1497
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
1498
  syncNodeBecomeFollower(pSyncNode, "leader to follower");
M
Minghao Li 已提交
1499 1500

  syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode);
M
Minghao Li 已提交
1501 1502 1503 1504
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
1505
  syncNodeBecomeFollower(pSyncNode, "candidate to follower");
M
Minghao Li 已提交
1506 1507

  syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode);
M
Minghao Li 已提交
1508 1509 1510
}

// raft vote --------------
M
Minghao Li 已提交
1511 1512 1513

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
1514 1515 1516 1517 1518 1519 1520
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
  assert(term == pSyncNode->pRaftStore->currentTerm);
  assert(!raftStoreHasVoted(pSyncNode->pRaftStore));

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
1521
// simulate get vote from outside
M
Minghao Li 已提交
1522 1523 1524
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

M
Minghao Li 已提交
1525
  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId);
M
Minghao Li 已提交
1526 1527 1528 1529 1530 1531 1532 1533 1534 1535
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}

M
Minghao Li 已提交
1536
// snapshot --------------
M
Minghao Li 已提交
1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
  bool      ret = false;
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
    pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
    if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
      ret = true;
    }
  }
  return ret;
}

1549
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
M
Minghao Li 已提交
1550 1551 1552 1553
  ASSERT(syncNodeHasSnapshot(pSyncNode));
  ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL);
  ASSERT(index >= SYNC_INDEX_BEGIN);

1554 1555
  SSnapshot snapshot;
  pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
M
Minghao Li 已提交
1556
  bool b = (index <= snapshot.lastApplyIndex);
1557 1558 1559
  return b;
}

M
Minghao Li 已提交
1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
    pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
  }
  SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);

  SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
  return lastIndex;
}

SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
1573 1574 1575 1576 1577 1578 1579
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
    if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
      pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
    }

M
Minghao Li 已提交
1580 1581 1582
    SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
    if (logLastIndex > snapshot.lastApplyIndex) {
      lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
M
Minghao Li 已提交
1583 1584 1585 1586
    } else {
      lastTerm = snapshot.lastApplyTerm;
    }

M
Minghao Li 已提交
1587
  } else {
M
Minghao Li 已提交
1588 1589
    // no snapshot
    lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
1590
  }
M
Minghao Li 已提交
1591

M
Minghao Li 已提交
1592 1593 1594 1595 1596 1597 1598
  return lastTerm;
}

// get last index and term along with snapshot
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
  *pLastIndex = syncNodeGetLastIndex(pSyncNode);
  *pLastTerm = syncNodeGetLastTerm(pSyncNode);
1599 1600
  return 0;
}
M
Minghao Li 已提交
1601

M
Minghao Li 已提交
1602 1603 1604 1605 1606
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
  SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
  return syncStartIndex;
}

M
Minghao Li 已提交
1607
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
1608
  ASSERT(index >= SYNC_INDEX_BEGIN);
M
Minghao Li 已提交
1609 1610 1611
  SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
  ASSERT(index <= syncStartIndex);

M
Minghao Li 已提交
1612
  SyncIndex preIndex = index - 1;
M
Minghao Li 已提交
1613 1614 1615
  return preIndex;
}

M
Minghao Li 已提交
1616 1617 1618 1619
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
  ASSERT(index >= SYNC_INDEX_BEGIN);
  SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
  ASSERT(index <= syncStartIndex);
M
Minghao Li 已提交
1620

M
Minghao Li 已提交
1621 1622
  if (index == SYNC_INDEX_BEGIN) {
    return 0;
M
Minghao Li 已提交
1623
  }
M
Minghao Li 已提交
1624

M
Minghao Li 已提交
1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639
  SyncTerm preTerm = 0;
  if (syncNodeHasSnapshot(pSyncNode)) {
    // has snapshot
    SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
    if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
      pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
    }

    if (index > snapshot.lastApplyIndex + 1) {
      // should be log preTerm
      SSyncRaftEntry* pPreEntry = NULL;
      int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
      ASSERT(code == 0);
      ASSERT(pPreEntry != NULL);

M
Minghao Li 已提交
1640 1641
      preTerm = pPreEntry->term;
      taosMemoryFree(pPreEntry);
M
Minghao Li 已提交
1642 1643 1644

    } else if (index == snapshot.lastApplyIndex + 1) {
      preTerm = snapshot.lastApplyTerm;
M
Minghao Li 已提交
1645

M
Minghao Li 已提交
1646
    } else {
1647
      // maybe snapshot change
M
Minghao Li 已提交
1648 1649 1650 1651 1652 1653 1654 1655 1656 1657
      sError("sync get pre term, bad scene. index:%ld", index);
      logStoreLog2("sync get pre term, bad scene", pSyncNode->pLogStore);

      SSyncRaftEntry* pPreEntry = NULL;
      int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
      ASSERT(code == 0);
      ASSERT(pPreEntry != NULL);

      preTerm = pPreEntry->term;
      taosMemoryFree(pPreEntry);
1658
    }
M
Minghao Li 已提交
1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670

  } else {
    // no snapshot
    ASSERT(index > SYNC_INDEX_BEGIN);

    SSyncRaftEntry* pPreEntry = NULL;
    int32_t         code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
    ASSERT(code == 0);
    ASSERT(pPreEntry != NULL);

    preTerm = pPreEntry->term;
    taosMemoryFree(pPreEntry);
1671
  }
M
Minghao Li 已提交
1672

M
Minghao Li 已提交
1673 1674
  return preTerm;
}
1675

M
Minghao Li 已提交
1676 1677 1678
// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
  *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
M
Minghao Li 已提交
1679
  *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
M
Minghao Li 已提交
1680 1681 1682
  return 0;
}

M
Minghao Li 已提交
1683 1684 1685 1686 1687
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
1688
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1689 1690 1691 1692 1693 1694
}

void syncNodePrint2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
1695
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1696 1697 1698 1699
}

void syncNodeLog(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
M
Minghao Li 已提交
1700
  sTraceLong("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
1701
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1702 1703 1704
}

void syncNodeLog2(char* s, SSyncNode* pObj) {
1705 1706 1707 1708 1709
  if (gRaftDetailLog) {
    char* serialized = syncNode2Str(pObj);
    sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
    taosMemoryFree(serialized);
  }
M
Minghao Li 已提交
1710 1711
}

M
Minghao Li 已提交
1712
// ------ local funciton ---------
M
Minghao Li 已提交
1713
// enqueue message ----
M
Minghao Li 已提交
1714 1715
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
1716
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
1717
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
M
Minghao Li 已提交
1718
                                              pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1719 1720
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1721
    syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
M
Minghao Li 已提交
1722
    if (pSyncNode->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1723
      pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
M
Minghao Li 已提交
1724 1725 1726
    } else {
      sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
    }
M
Minghao Li 已提交
1727 1728
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1729
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
1730 1731
                 &pSyncNode->pPingTimer);
  } else {
1732
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
1733
           pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
1734 1735 1736 1737 1738 1739 1740
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
M
Minghao Li 已提交
1741
                                              pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1742
    SRpcMsg      rpcMsg;
M
Minghao Li 已提交
1743
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1744
    syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
M
Minghao Li 已提交
1745
    if (pSyncNode->FpEqMsg != NULL) {
1746 1747 1748 1749 1750 1751 1752
      int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
      if (code != 0) {
        sError("vgId:%d sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
        rpcFreeCont(rpcMsg.pCont);
        syncTimeoutDestroy(pSyncMsg);
        return;
      }
M
Minghao Li 已提交
1753
    } else {
1754
      sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
M
Minghao Li 已提交
1755
    }
M
Minghao Li 已提交
1756 1757
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
1758
    // reset timer ms
1759 1760 1761 1762 1763 1764 1765
    if (gSyncEnv != NULL) {
      pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
      taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                   &pSyncNode->pElectTimer);
    } else {
      sError("sync env elect is already stop");
    }
M
Minghao Li 已提交
1766
  } else {
1767
    sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
1768
           pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
1769 1770 1771
  }
}

M
Minghao Li 已提交
1772 1773 1774 1775 1776 1777
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
      atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
    SyncTimeout* pSyncMsg =
        syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
M
Minghao Li 已提交
1778
                          pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
M
Minghao Li 已提交
1779 1780
    SRpcMsg rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1781
    syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
M
Minghao Li 已提交
1782
    if (pSyncNode->FpEqMsg != NULL) {
1783 1784 1785 1786
      int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
      if (code != 0) {
        sError("vgId:%d sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
        rpcFreeCont(rpcMsg.pCont);
1787
        syncTimeoutDestroy(pSyncMsg);
1788 1789
        return;
      }
M
Minghao Li 已提交
1790
    } else {
1791
      sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL");
M
Minghao Li 已提交
1792
    }
M
Minghao Li 已提交
1793 1794
    syncTimeoutDestroy(pSyncMsg);

1795
    if (gSyncEnv != NULL) {
1796 1797
      taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                   &pSyncNode->pHeartbeatTimer);
1798
    } else {
1799
      sError("sync env heartbeat is already stop");
1800
    }
M
Minghao Li 已提交
1801
  } else {
M
Minghao Li 已提交
1802 1803
    sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
           "",
M
Minghao Li 已提交
1804 1805 1806 1807
           pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  }
}

M
Minghao Li 已提交
1808 1809 1810 1811 1812 1813
static int32_t syncNodeEqNoop(SSyncNode* ths) {
  int32_t ret = 0;
  assert(ths->state == TAOS_SYNC_STATE_LEADER);

  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1814
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1815 1816 1817 1818 1819 1820 1821 1822
  assert(pEntry != NULL);

  uint32_t           entryLen;
  char*              serialized = syncEntrySerialize(pEntry, &entryLen);
  SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen);
  assert(pSyncMsg->dataLen == entryLen);
  memcpy(pSyncMsg->data, serialized, entryLen);

S
Shengliang Guan 已提交
1823
  SRpcMsg rpcMsg = {0};
M
Minghao Li 已提交
1824
  syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
1825
  if (ths->FpEqMsg != NULL) {
S
Shengliang Guan 已提交
1826
    ths->FpEqMsg(ths->msgcb, &rpcMsg);
M
Minghao Li 已提交
1827 1828 1829
  } else {
    sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
  }
M
Minghao Li 已提交
1830

wafwerar's avatar
wafwerar 已提交
1831
  taosMemoryFree(serialized);
M
Minghao Li 已提交
1832 1833 1834 1835 1836 1837 1838 1839 1840 1841
  syncClientRequestDestroy(pSyncMsg);

  return ret;
}

static int32_t syncNodeAppendNoop(SSyncNode* ths) {
  int32_t ret = 0;

  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
M
Minghao Li 已提交
1842
  SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
M
Minghao Li 已提交
1843 1844 1845
  assert(pEntry != NULL);

  if (ths->state == TAOS_SYNC_STATE_LEADER) {
1846 1847
    // ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
    ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
M
Minghao Li 已提交
1848 1849 1850
    syncNodeReplicate(ths);
  }

M
Minghao Li 已提交
1851
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1852 1853 1854
  return ret;
}

M
Minghao Li 已提交
1855
// on message ----
M
Minghao Li 已提交
1856 1857
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
  // log state
1858
  char logBuf[1024] = {0};
M
Minghao Li 已提交
1859 1860 1861 1862 1863 1864
  snprintf(logBuf, sizeof(logBuf),
           "==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%lu electTimerLogicClock:%lu, "
           "electTimerLogicClockUser:%lu, electTimerMS:%d",
           ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm,
           ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS);

M
Minghao Li 已提交
1865
  int32_t ret = 0;
M
Minghao Li 已提交
1866 1867
  syncPingLog2(logBuf, pMsg);
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
M
Minghao Li 已提交
1868 1869
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
M
Minghao Li 已提交
1870 1871 1872 1873 1874 1875 1876 1877

  /*
    // htonl
    SMsgHead* pHead = rpcMsg.pCont;
    pHead->contLen = htonl(pHead->contLen);
    pHead->vgId = htonl(pHead->vgId);
  */

M
Minghao Li 已提交
1878 1879 1880 1881 1882
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

  return ret;
}

M
Minghao Li 已提交
1883
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
1884 1885 1886 1887
  int32_t ret = 0;
  syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg);
  return ret;
}
M
Minghao Li 已提交
1888

M
Minghao Li 已提交
1889 1890 1891 1892 1893 1894 1895 1896 1897 1898
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
1899
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
M
Minghao Li 已提交
1900 1901 1902
  int32_t ret = 0;
  syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);

M
Minghao Li 已提交
1903
  SyncIndex       index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
M
Minghao Li 已提交
1904 1905 1906 1907
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
  assert(pEntry != NULL);

M
Minghao Li 已提交
1908
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
1909 1910
    // ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
    ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
M
Minghao Li 已提交
1911 1912

    // start replicate right now!
M
Minghao Li 已提交
1913
    syncNodeReplicate(ths);
M
Minghao Li 已提交
1914

M
Minghao Li 已提交
1915 1916 1917 1918
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
1919
    if (ths->pFsm != NULL) {
1920
      // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
M
Minghao Li 已提交
1921
      if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
1922 1923 1924 1925 1926 1927 1928
        SFsmCbMeta cbMeta;
        cbMeta.index = pEntry->index;
        cbMeta.isWeak = pEntry->isWeak;
        cbMeta.code = 0;
        cbMeta.state = ths->state;
        cbMeta.seqNum = pEntry->seqNum;
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
1929
      }
M
Minghao Li 已提交
1930 1931 1932
    }
    rpcFreeCont(rpcMsg.pCont);

M
Minghao Li 已提交
1933 1934 1935
    // only myself, maybe commit
    syncMaybeAdvanceCommitIndex(ths);

M
Minghao Li 已提交
1936
  } else {
M
Minghao Li 已提交
1937 1938 1939 1940
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
1941
    if (ths->pFsm != NULL) {
1942
      // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
M
Minghao Li 已提交
1943
      if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
M
Minghao Li 已提交
1944 1945 1946 1947 1948 1949 1950
        SFsmCbMeta cbMeta;
        cbMeta.index = pEntry->index;
        cbMeta.isWeak = pEntry->isWeak;
        cbMeta.code = 1;
        cbMeta.state = ths->state;
        cbMeta.seqNum = pEntry->seqNum;
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
M
Minghao Li 已提交
1951
      }
M
Minghao Li 已提交
1952 1953
    }
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
1954 1955
  }

M
Minghao Li 已提交
1956
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
1957
  return ret;
1958
}
M
Minghao Li 已提交
1959 1960 1961

static void syncFreeNode(void* param) {
  SSyncNode* pNode = param;
M
Minghao Li 已提交
1962 1963
  // inner object already free
  // syncNodePrint2((char*)"==syncFreeNode==", pNode);
M
Minghao Li 已提交
1964

wafwerar's avatar
wafwerar 已提交
1965
  taosMemoryFree(pNode);
M
Minghao Li 已提交
1966
}
S
Shengliang Guan 已提交
1967 1968 1969 1970

const char* syncStr(ESyncState state) {
  switch (state) {
    case TAOS_SYNC_STATE_FOLLOWER:
1971
      return "follower";
S
Shengliang Guan 已提交
1972
    case TAOS_SYNC_STATE_CANDIDATE:
1973
      return "candidate";
S
Shengliang Guan 已提交
1974
    case TAOS_SYNC_STATE_LEADER:
1975
      return "leader";
S
Shengliang Guan 已提交
1976
    default:
1977
      return "error";
S
Shengliang Guan 已提交
1978
  }
M
Minghao Li 已提交
1979
}
1980

1981
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
M
Minghao Li 已提交
1982 1983
  SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);

1984 1985 1986 1987 1988 1989 1990
  /*
    char     host[128];
    uint16_t port;
    syncUtilU642Addr(pSyncLeaderTransfer->newLeaderId.addr, host, sizeof(host), &port);
    sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, host, port,
           pSyncLeaderTransfer->newLeaderId.addr);
  */
M
Minghao Li 已提交
1991 1992 1993

  sDebug("vgId:%d sync event, begin leader transfer", ths->vgId);

1994 1995 1996 1997 1998
  if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
      pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
    sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId,
           pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
           pSyncLeaderTransfer->newLeaderId.addr);
M
Minghao Li 已提交
1999 2000 2001 2002 2003

    // reset elect timer now!
    int32_t electMS = 1;
    int32_t ret = syncNodeRestartElectTimer(ths, electMS);
    ASSERT(ret == 0);
2004 2005
  }

2006 2007 2008 2009 2010 2011 2012 2013
  /*
    if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) {
      // reset elect timer now!
      int32_t electMS = 1;
      int32_t ret = syncNodeRestartElectTimer(ths, electMS);
      ASSERT(ret == 0);
    }
  */
M
Minghao Li 已提交
2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024
  if (ths->pFsm->FpLeaderTransferCb != NULL) {
    SFsmCbMeta cbMeta;
    cbMeta.code = 0;
    cbMeta.currentTerm = ths->pRaftStore->currentTerm;
    cbMeta.flag = 0;
    cbMeta.index = pEntry->index;
    cbMeta.isWeak = pEntry->isWeak;
    cbMeta.seqNum = pEntry->seqNum;
    cbMeta.state = ths->state;
    cbMeta.term = pEntry->term;
    ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta);
2025 2026
  }

M
Minghao Li 已提交
2027
  syncLeaderTransferDestroy(pSyncLeaderTransfer);
2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050
  return 0;
}

static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
  SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;

  SSyncCfg newSyncCfg;
  int32_t  ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
  ASSERT(ret == 0);

  // update new config myIndex
  bool IamInNew = false;
  for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
    if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
        ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
      newSyncCfg.myIndex = i;
      IamInNew = true;
      break;
    }
  }

  bool isDrop;

2051
  // if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
2052
  if (IamInNew) {
2053 2054 2055 2056
    syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);

    // change isStandBy to normal
    if (!isDrop) {
2057 2058
      char tmpbuf[128];
      snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum);
2059
      if (ths->state == TAOS_SYNC_STATE_LEADER) {
2060
        syncNodeBecomeLeader(ths, tmpbuf);
2061
      } else {
2062
        syncNodeBecomeFollower(ths, tmpbuf);
2063 2064
      }
    }
2065
  } else {
2066 2067 2068
    char tmpbuf[128];
    snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum);
    syncNodeBecomeFollower(ths, tmpbuf);
2069
  }
2070

2071 2072 2073
  if (gRaftDetailLog) {
    char* sOld = syncCfg2Str(&oldSyncCfg);
    char* sNew = syncCfg2Str(&newSyncCfg);
2074 2075
    sInfo("==config change== 0x11 old:%s new:%s isDrop:%d index:%ld IamInNew:%d \n", sOld, sNew, isDrop, pEntry->index,
          IamInNew);
2076 2077
    taosMemoryFree(sOld);
    taosMemoryFree(sNew);
2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097
  }

  // always call FpReConfigCb
  if (ths->pFsm->FpReConfigCb != NULL) {
    SReConfigCbMeta cbMeta = {0};
    cbMeta.code = 0;
    cbMeta.currentTerm = ths->pRaftStore->currentTerm;
    cbMeta.index = pEntry->index;
    cbMeta.term = pEntry->term;
    cbMeta.newCfg = newSyncCfg;
    cbMeta.oldCfg = oldSyncCfg;
    cbMeta.seqNum = pEntry->seqNum;
    cbMeta.flag = 0x11;
    cbMeta.isDrop = isDrop;
    ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta);
  }

  return 0;
}

2098
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
2099 2100
  int32_t    code = 0;
  ESyncState state = flag;
2101
  sDebug("vgId:%d sync event commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
X
Xiaoyu Wang 已提交
2102
         endIndex, syncUtilState2String(state));
2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115

  // execute fsm
  if (ths->pFsm != NULL) {
    for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
      if (i != SYNC_INDEX_INVALID) {
        SSyncRaftEntry* pEntry;
        code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
        ASSERT(code == 0);
        ASSERT(pEntry != NULL);

        SRpcMsg rpcMsg;
        syncEntry2OriginalRpc(pEntry, &rpcMsg);

2116
        // user commit
2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131
        if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
          SFsmCbMeta cbMeta;
          cbMeta.index = pEntry->index;
          cbMeta.isWeak = pEntry->isWeak;
          cbMeta.code = 0;
          cbMeta.state = ths->state;
          cbMeta.seqNum = pEntry->seqNum;
          cbMeta.term = pEntry->term;
          cbMeta.currentTerm = ths->pRaftStore->currentTerm;
          cbMeta.flag = flag;

          ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
        }

        // config change
2132
        if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
2133 2134 2135
          code = syncNodeConfigChange(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
        }
2136

M
Minghao Li 已提交
2137
        // leader transfer
2138 2139 2140
        if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
          code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
          ASSERT(code == 0);
2141 2142 2143 2144 2145 2146 2147 2148 2149
        }

        // restore finish
        if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
          if (ths->restoreFinish == false) {
            if (ths->pFsm->FpRestoreFinishCb != NULL) {
              ths->pFsm->FpRestoreFinishCb(ths->pFsm);
            }
            ths->restoreFinish = true;
2150
            sDebug("vgId:%d sync event restore finish", ths->vgId);
2151 2152 2153 2154 2155 2156 2157 2158 2159
          }
        }

        rpcFreeCont(rpcMsg.pCont);
        syncEntryDestory(pEntry);
      }
    }
  }
  return 0;
2160 2161 2162 2163 2164 2165 2166 2167 2168
}

bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
      return true;
    }
  }
  return false;
M
Minghao Li 已提交
2169 2170 2171 2172 2173 2174 2175 2176 2177 2178
}

SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
  SSyncSnapshotSender* pSender = NULL;
  for (int i = 0; i < ths->replicaNum; ++i) {
    if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
      pSender = (ths->senders)[i];
    }
  }
  return pSender;
M
Minghao Li 已提交
2179
}