syncMain.c 29.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18 19
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
M
Minghao Li 已提交
20
#include "syncCommit.h"
M
Minghao Li 已提交
21
#include "syncElection.h"
M
Minghao Li 已提交
22
#include "syncEnv.h"
M
Minghao Li 已提交
23
#include "syncIndexMgr.h"
M
Minghao Li 已提交
24
#include "syncInt.h"
M
Minghao Li 已提交
25
#include "syncMessage.h"
M
Minghao Li 已提交
26
#include "syncRaftLog.h"
M
Minghao Li 已提交
27
#include "syncRaftStore.h"
M
Minghao Li 已提交
28
#include "syncReplication.h"
M
Minghao Li 已提交
29 30
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
M
Minghao Li 已提交
31
#include "syncTimeout.h"
M
Minghao Li 已提交
32
#include "syncUtil.h"
M
Minghao Li 已提交
33
#include "syncVoteMgr.h"
M
Minghao Li 已提交
34

M
Minghao Li 已提交
35 36 37
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
38
// enqueue message ----
M
Minghao Li 已提交
39 40 41 42
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);

M
Minghao Li 已提交
43
// on message ----
M
Minghao Li 已提交
44 45
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
M
Minghao Li 已提交
46
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
M
Minghao Li 已提交
47 48 49
// ---------------------------------

int32_t syncInit() {
M
Minghao Li 已提交
50 51
  int32_t ret = syncEnvStart();
  return ret;
M
Minghao Li 已提交
52
}
M
Minghao Li 已提交
53

M
Minghao Li 已提交
54 55 56 57
void syncCleanUp() {
  int32_t ret = syncEnvStop();
  assert(ret == 0);
}
M
Minghao Li 已提交
58

M
Minghao Li 已提交
59
int64_t syncStart(const SSyncInfo* pSyncInfo) {
M
Minghao Li 已提交
60
  int32_t    ret = 0;
M
Minghao Li 已提交
61 62
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
63 64

  // todo : return ref id
M
Minghao Li 已提交
65
  return ret;
M
Minghao Li 已提交
66
}
M
Minghao Li 已提交
67

M
Minghao Li 已提交
68
void syncStop(int64_t rid) {
M
Minghao Li 已提交
69 70
  // todo : get pointer from rid
  SSyncNode* pSyncNode = NULL;
M
Minghao Li 已提交
71 72
  syncNodeClose(pSyncNode);
}
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75 76 77
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
  int32_t ret = 0;
  return ret;
}
M
Minghao Li 已提交
78

M
Minghao Li 已提交
79
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
M
Minghao Li 已提交
80 81 82 83
  int32_t ret = 0;

  // todo : get pointer from rid
  SSyncNode* pSyncNode = NULL;
M
Minghao Li 已提交
84 85 86 87 88 89
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak);
    SRpcMsg            rpcMsg;
    syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncClientRequestDestroy(pSyncMsg);
M
Minghao Li 已提交
90 91
    ret = 0;

M
Minghao Li 已提交
92 93
  } else {
    sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
94
    ret = -1;  // todo : need define err code !!
M
Minghao Li 已提交
95
  }
M
Minghao Li 已提交
96
  return ret;
M
Minghao Li 已提交
97
}
M
Minghao Li 已提交
98

M
Minghao Li 已提交
99
ESyncState syncGetMyRole(int64_t rid) {
M
Minghao Li 已提交
100 101
  // todo : get pointer from rid
  SSyncNode* pSyncNode = NULL;
M
Minghao Li 已提交
102 103
  return pSyncNode->state;
}
M
Minghao Li 已提交
104

M
Minghao Li 已提交
105 106
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}

M
Minghao Li 已提交
107
// open/close --------------
M
Minghao Li 已提交
108 109 110
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
111
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
112

M
Minghao Li 已提交
113 114 115 116 117 118
  if (taosMkDir(pSyncInfo->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
    return NULL;
  }

M
Minghao Li 已提交
119
  // init by SSyncInfo
M
Minghao Li 已提交
120 121 122
  pSyncNode->vgId = pSyncInfo->vgId;
  pSyncNode->syncCfg = pSyncInfo->syncCfg;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
M
Minghao Li 已提交
123
  snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path);
M
Minghao Li 已提交
124
  pSyncNode->pWal = pSyncInfo->pWal;
M
Minghao Li 已提交
125 126
  pSyncNode->rpcClient = pSyncInfo->rpcClient;
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
127 128
  pSyncNode->queue = pSyncInfo->queue;
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
129

M
Minghao Li 已提交
130
  // init internal
M
Minghao Li 已提交
131 132
  pSyncNode->myNodeInfo = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
  syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncInfo->vgId, &pSyncNode->myRaftId);
M
Minghao Li 已提交
133

M
Minghao Li 已提交
134 135
  // init peersNum, peers, peersId
  pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
M
Minghao Li 已提交
136 137 138
  int j = 0;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    if (i != pSyncInfo->syncCfg.myIndex) {
M
Minghao Li 已提交
139
      pSyncNode->peersNodeInfo[j] = pSyncInfo->syncCfg.nodeInfo[i];
M
Minghao Li 已提交
140 141 142
      j++;
    }
  }
M
Minghao Li 已提交
143
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
144
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncInfo->vgId, &pSyncNode->peersId[i]);
M
Minghao Li 已提交
145
  }
M
Minghao Li 已提交
146

M
Minghao Li 已提交
147 148 149 150 151 152
  // init replicaNum, replicasId
  pSyncNode->replicaNum = pSyncInfo->syncCfg.replicaNum;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    syncUtilnodeInfo2raftId(&pSyncInfo->syncCfg.nodeInfo[i], pSyncInfo->vgId, &pSyncNode->replicasId[i]);
  }

M
Minghao Li 已提交
153
  // init raft algorithm
M
Minghao Li 已提交
154 155 156 157
  pSyncNode->pFsm = pSyncInfo->pFsm;
  pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum);
  pSyncNode->leaderCache = EMPTY_RAFT_ID;

M
Minghao Li 已提交
158
  // init life cycle
M
Minghao Li 已提交
159

M
Minghao Li 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
  // TLA+ Spec
  // InitHistoryVars == /\ elections = {}
  //                    /\ allLogs   = {}
  //                    /\ voterLog  = [i \in Server |-> [j \in {} |-> <<>>]]
  // InitServerVars == /\ currentTerm = [i \in Server |-> 1]
  //                   /\ state       = [i \in Server |-> Follower]
  //                   /\ votedFor    = [i \in Server |-> Nil]
  // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}]
  //                      /\ votesGranted   = [i \in Server |-> {}]
  // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the
  // \* leader does not send itself messages. It's still easier to include these
  // \* in the functions.
  // InitLeaderVars == /\ nextIndex  = [i \in Server |-> [j \in Server |-> 1]]
  //                   /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]]
  // InitLogVars == /\ log          = [i \in Server |-> << >>]
  //                /\ commitIndex  = [i \in Server |-> 0]
  // Init == /\ messages = [m \in {} |-> 0]
  //         /\ InitHistoryVars
  //         /\ InitServerVars
  //         /\ InitCandidateVars
  //         /\ InitLeaderVars
  //         /\ InitLogVars
  //

M
Minghao Li 已提交
184
  // init TLA+ server vars
M
syncInt  
Minghao Li 已提交
185
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
186
  pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
M
Minghao Li 已提交
187 188
  assert(pSyncNode->pRaftStore != NULL);

M
Minghao Li 已提交
189
  // init TLA+ candidate vars
M
Minghao Li 已提交
190 191 192 193 194
  pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
  assert(pSyncNode->pVotesGranted != NULL);
  pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
  assert(pSyncNode->pVotesRespond != NULL);

M
Minghao Li 已提交
195 196 197 198 199 200 201 202 203
  // init TLA+ leader vars
  pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pNextIndex != NULL);
  pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
  assert(pSyncNode->pMatchIndex != NULL);

  // init TLA+ log vars
  pSyncNode->pLogStore = logStoreCreate(pSyncNode);
  assert(pSyncNode->pLogStore != NULL);
M
Minghao Li 已提交
204
  pSyncNode->commitIndex = SYNC_INDEX_INVALID;
M
Minghao Li 已提交
205

M
Minghao Li 已提交
206
  // init ping timer
M
Minghao Li 已提交
207
  pSyncNode->pPingTimer = NULL;
M
Minghao Li 已提交
208 209 210
  pSyncNode->pingTimerMS = PING_TIMER_MS;
  atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
M
Minghao Li 已提交
211
  pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
M
Minghao Li 已提交
212
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
213

M
Minghao Li 已提交
214 215 216 217 218
  // init elect timer
  pSyncNode->pElectTimer = NULL;
  pSyncNode->electTimerMS = syncUtilElectRandomMS();
  atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
M
Minghao Li 已提交
219
  pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
M
Minghao Li 已提交
220 221 222 223 224 225 226
  pSyncNode->electTimerCounter = 0;

  // init heartbeat timer
  pSyncNode->pHeartbeatTimer = NULL;
  pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS;
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
M
Minghao Li 已提交
227
  pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
M
Minghao Li 已提交
228 229
  pSyncNode->heartbeatTimerCounter = 0;

M
Minghao Li 已提交
230
  // init callback
M
Minghao Li 已提交
231 232
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
M
Minghao Li 已提交
233
  pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb;
M
Minghao Li 已提交
234 235 236 237
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
238
  pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
M
Minghao Li 已提交
239

M
Minghao Li 已提交
240 241 242
  // start raft
  syncNodeBecomeFollower(pSyncNode);

M
Minghao Li 已提交
243 244 245 246
  return pSyncNode;
}

void syncNodeClose(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
247
  int32_t ret;
M
Minghao Li 已提交
248
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
249 250 251 252 253 254 255 256 257 258 259 260 261 262

  ret = raftStoreClose(pSyncNode->pRaftStore);
  assert(ret == 0);

  voteGrantedDestroy(pSyncNode->pVotesGranted);
  votesRespondDestory(pSyncNode->pVotesRespond);
  syncIndexMgrDestroy(pSyncNode->pNextIndex);
  syncIndexMgrDestroy(pSyncNode->pMatchIndex);
  logStoreDestory(pSyncNode->pLogStore);

  syncNodeStopPingTimer(pSyncNode);
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
263 264 265
  free(pSyncNode);
}

M
Minghao Li 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
  syncPingLog2((char*)"==syncNodePing==", pMsg);
  int32_t ret = 0;

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);

  ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
  return ret;
}

int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId);
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
  assert(ret == 0);

  syncPingDestroy(pMsg);
  return ret;
}

int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

int32_t syncNodePingAll(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
    assert(ret == 0);
    syncPingDestroy(pMsg);
  }
  return ret;
}

// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pPingTimer);
  atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pPingTimer);
  pSyncNode->pPingTimer = NULL;
  return ret;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  pSyncNode->electTimerMS = ms;
  taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pElectTimer);
  atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pElectTimer);
  pSyncNode->pElectTimer = NULL;
  return ret;
}

int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
  int32_t ret = 0;
  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode, ms);
  return ret;
}

M
Minghao Li 已提交
356 357 358 359 360 361 362
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  int32_t electMS = syncUtilElectRandomMS();
  ret = syncNodeRestartElectTimer(pSyncNode, electMS);
  return ret;
}

M
Minghao Li 已提交
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
               &pSyncNode->pHeartbeatTimer);
  atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  return ret;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
  taosTmrStop(pSyncNode->pHeartbeatTimer);
  pSyncNode->pHeartbeatTimer = NULL;
  return ret;
}

// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

M
Minghao Li 已提交
394 395 396 397
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
  char   u64buf[128];
  cJSON* pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
  if (pSyncNode != NULL) {
    // init by SSyncInfo
    cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
    cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient);
    cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
    cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue);
    cJSON_AddStringToObject(pRoot, "queue", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
    cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);

    // init internal
    cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
    cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
    cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
    cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);

    cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
    cJSON* pPeers = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
    }
    cJSON* pPeersId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
    for (int i = 0; i < pSyncNode->peersNum; ++i) {
      cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
    }
M
Minghao Li 已提交
432

M
Minghao Li 已提交
433 434 435 436 437 438
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
    cJSON* pReplicasId = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
    for (int i = 0; i < pSyncNode->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
    }
M
Minghao Li 已提交
439

M
Minghao Li 已提交
440 441 442 443 444 445 446 447 448 449
    // raft algorithm
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
    cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
    cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
    cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
    cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);

    // tla+ server vars
    cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
    cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
M
Minghao Li 已提交
450 451 452
    char tmpBuf[RAFT_STORE_BLOCK_SIZE];
    raftStoreSerialize(pSyncNode->pRaftStore, tmpBuf, sizeof(tmpBuf));
    cJSON_AddStringToObject(pRoot, "pRaftStore", tmpBuf);
M
Minghao Li 已提交
453 454 455 456 457 458 459 460 461 462 463

    // tla+ candidate vars
    cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
    cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));

    // tla+ leader vars
    cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
    cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));

    // tla+ log vars
    cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
M
Minghao Li 已提交
464
    snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex);
M
Minghao Li 已提交
465 466 467 468 469 470
    cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);

    // ping timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
    cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
M
Minghao Li 已提交
471
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock);
M
Minghao Li 已提交
472
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
M
Minghao Li 已提交
473
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
474 475 476
    cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
    cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
M
Minghao Li 已提交
477
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter);
M
Minghao Li 已提交
478 479 480 481 482 483
    cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);

    // elect timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
    cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
M
Minghao Li 已提交
484
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock);
M
Minghao Li 已提交
485
    cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
M
Minghao Li 已提交
486
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
487 488 489
    cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
    cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
M
Minghao Li 已提交
490
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter);
M
Minghao Li 已提交
491 492 493 494 495 496
    cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);

    // heartbeat timer
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
    cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
    cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
M
Minghao Li 已提交
497
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock);
M
Minghao Li 已提交
498
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
M
Minghao Li 已提交
499
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser);
M
Minghao Li 已提交
500 501 502
    cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
    cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
M
Minghao Li 已提交
503
    snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter);
M
Minghao Li 已提交
504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
    cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);

    // callback
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
    cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
    cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
    cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
    cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
    cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
M
Minghao Li 已提交
521 522 523 524 525 526 527
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
  return pJson;
}

M
Minghao Li 已提交
528 529 530 531 532 533 534
char* syncNode2Str(const SSyncNode* pSyncNode) {
  cJSON* pJson = syncNode2Json(pSyncNode);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
535 536 537 538 539 540 541 542 543 544
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
  if (term > pSyncNode->pRaftStore->currentTerm) {
    raftStoreSetTerm(pSyncNode->pRaftStore, term);
    syncNodeBecomeFollower(pSyncNode);
    raftStoreClearVote(pSyncNode->pRaftStore);
  }
}

void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
545
  // maybe clear leader cache
M
Minghao Li 已提交
546 547 548 549
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache = EMPTY_RAFT_ID;
  }

M
Minghao Li 已提交
550
  // state change
M
Minghao Li 已提交
551 552 553
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
  syncNodeStopHeartbeatTimer(pSyncNode);

M
Minghao Li 已提交
554 555
  // reset elect timer
  syncNodeResetElectTimer(pSyncNode);
M
Minghao Li 已提交
556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
}

// TLA+ Spec
// \* Candidate i transitions to leader.
// BecomeLeader(i) ==
//     /\ state[i] = Candidate
//     /\ votesGranted[i] \in Quorum
//     /\ state'      = [state EXCEPT ![i] = Leader]
//     /\ nextIndex'  = [nextIndex EXCEPT ![i] =
//                          [j \in Server |-> Len(log[i]) + 1]]
//     /\ matchIndex' = [matchIndex EXCEPT ![i] =
//                          [j \in Server |-> 0]]
//     /\ elections'  = elections \cup
//                          {[eterm     |-> currentTerm[i],
//                            eleader   |-> i,
//                            elog      |-> log[i],
//                            evotes    |-> votesGranted[i],
//                            evoterLog |-> voterLog[i]]}
//     /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
577
  // state change
M
Minghao Li 已提交
578
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
M
Minghao Li 已提交
579 580

  // set leader cache
M
Minghao Li 已提交
581 582 583
  pSyncNode->leaderCache = pSyncNode->myRaftId;

  for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
M
Minghao Li 已提交
584 585
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
586 587 588 589
    pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
  }

  for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
M
Minghao Li 已提交
590 591
    // maybe overwrite myself, no harm
    // just do it!
M
Minghao Li 已提交
592 593 594
    pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
595
  // stop elect timer
M
Minghao Li 已提交
596
  syncNodeStopElectTimer(pSyncNode);
M
Minghao Li 已提交
597 598

  // start replicate right now!
M
Minghao Li 已提交
599
  syncNodeReplicate(pSyncNode);
M
Minghao Li 已提交
600 601 602

  // start heartbeat timer
  syncNodeStartHeartbeatTimer(pSyncNode);
M
Minghao Li 已提交
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  assert(voteGrantedMajority(pSyncNode->pVotesGranted));
  syncNodeBecomeLeader(pSyncNode);
}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
  syncNodeBecomeFollower(pSyncNode);
}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
  syncNodeBecomeFollower(pSyncNode);
}

// raft vote --------------
M
Minghao Li 已提交
627 628 629

// just called by syncNodeVoteForSelf
// need assert
M
Minghao Li 已提交
630 631 632 633 634 635 636
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
  assert(term == pSyncNode->pRaftStore->currentTerm);
  assert(!raftStoreHasVoted(pSyncNode->pRaftStore));

  raftStoreVote(pSyncNode->pRaftStore, pRaftId);
}

M
Minghao Li 已提交
637
// simulate get vote from outside
M
Minghao Li 已提交
638 639 640 641 642 643 644 645 646 647 648 649 650 651
void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
  syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId));

  SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild();
  pMsg->srcId = pSyncNode->myRaftId;
  pMsg->destId = pSyncNode->myRaftId;
  pMsg->term = pSyncNode->pRaftStore->currentTerm;
  pMsg->voteGranted = true;

  voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
  votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
  syncRequestVoteReplyDestroy(pMsg);
}

M
Minghao Li 已提交
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
  free(serialized);
}

void syncNodePrint2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
  free(serialized);
}

void syncNodeLog(SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  sTrace("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
  free(serialized);
}

void syncNodeLog2(char* s, SSyncNode* pObj) {
  char* serialized = syncNode2Str(pObj);
  sTrace("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
  free(serialized);
}

M
Minghao Li 已提交
679
// ------ local funciton ---------
M
Minghao Li 已提交
680
// enqueue message ----
M
Minghao Li 已提交
681 682
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
683
  if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
M
Minghao Li 已提交
684 685 686 687
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
                                              pSyncNode->pingTimerMS, pSyncNode);
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
688
    syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
M
Minghao Li 已提交
689 690 691
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
692
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
693 694
                 &pSyncNode->pPingTimer);
  } else {
695
    sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
696
           pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
M
Minghao Li 已提交
697 698 699 700 701 702 703 704
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
                                              pSyncNode->electTimerMS, pSyncNode);
M
Minghao Li 已提交
705
    SRpcMsg      rpcMsg;
M
Minghao Li 已提交
706
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
707
    syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
M
Minghao Li 已提交
708 709 710
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
711 712
    // reset timer ms
    pSyncNode->electTimerMS = syncUtilElectRandomMS();
M
Minghao Li 已提交
713
    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
714 715
                 &pSyncNode->pPingTimer);
  } else {
716
    sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "",
M
Minghao Li 已提交
717
           pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
M
Minghao Li 已提交
718 719 720
  }
}

M
Minghao Li 已提交
721 722 723 724 725 726 727 728 729
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
      atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
    SyncTimeout* pSyncMsg =
        syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
                          pSyncNode->heartbeatTimerMS, pSyncNode);
    SRpcMsg rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
M
Minghao Li 已提交
730
    syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
M
Minghao Li 已提交
731 732 733
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncTimeoutDestroy(pSyncMsg);

M
Minghao Li 已提交
734
    taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
735 736
                 &pSyncNode->pHeartbeatTimer);
  } else {
M
Minghao Li 已提交
737 738
    sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
           "",
M
Minghao Li 已提交
739 740 741 742
           pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
  }
}

M
Minghao Li 已提交
743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759
// on message ----
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
  int32_t ret = 0;
  syncPingLog2("==syncNodeOnPingCb==", pMsg);
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId);
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

  return ret;
}

static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
  int32_t ret = 0;
  syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg);
  return ret;
}
M
Minghao Li 已提交
760

M
Minghao Li 已提交
761 762 763 764 765 766 767 768 769 770
// TLA+ Spec
// ClientRequest(i, v) ==
//     /\ state[i] = Leader
//     /\ LET entry == [term  |-> currentTerm[i],
//                      value |-> v]
//            newLog == Append(log[i], entry)
//        IN  log' = [log EXCEPT ![i] = newLog]
//     /\ UNCHANGED <<messages, serverVars, candidateVars,
//                    leaderVars, commitIndex>>
//
M
Minghao Li 已提交
771 772 773 774
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
  int32_t ret = 0;
  syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);

M
Minghao Li 已提交
775 776 777 778 779
  SyncIndex       index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
  SyncTerm        term = ths->pRaftStore->currentTerm;
  SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
  assert(pEntry != NULL);

M
Minghao Li 已提交
780 781
  if (ths->state == TAOS_SYNC_STATE_LEADER) {
    ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
M
Minghao Li 已提交
782 783

    // start replicate right now!
M
Minghao Li 已提交
784
    syncNodeReplicate(ths);
M
Minghao Li 已提交
785

M
Minghao Li 已提交
786 787 788 789
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
790 791 792 793
    if (ths->pFsm != NULL) {
      if (ths->pFsm->FpPreCommitCb != NULL) {
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
      }
M
Minghao Li 已提交
794 795 796
    }
    rpcFreeCont(rpcMsg.pCont);

M
Minghao Li 已提交
797 798 799
    // only myself, maybe commit
    syncMaybeAdvanceCommitIndex(ths);

M
Minghao Li 已提交
800
  } else {
M
Minghao Li 已提交
801 802 803 804
    // pre commit
    SRpcMsg rpcMsg;
    syncEntry2OriginalRpc(pEntry, &rpcMsg);

M
Minghao Li 已提交
805 806
    if (ths->pFsm != NULL) {
      if (ths->pFsm->FpPreCommitCb != NULL) {
M
Minghao Li 已提交
807
        ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -2);
M
Minghao Li 已提交
808
      }
M
Minghao Li 已提交
809 810
    }
    rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
811 812
  }

M
Minghao Li 已提交
813
  syncEntryDestory(pEntry);
M
Minghao Li 已提交
814
  return ret;
815
}