vnodeSync.c 30.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19 20
#define BATCH_DISABLE 1

21
static inline bool vnodeIsMsgBlock(tmsg_t type) {
S
Shengliang Guan 已提交
22 23
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_REPLICA);
24
}
M
Minghao Li 已提交
25

26
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
27

28 29
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
30
    const STraceId *trace = &pMsg->info.traceId;
31
    taosThreadMutexLock(&pVnode->lock);
32 33 34
    if (!pVnode->blocked) {
      vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
      pVnode->blocked = true;
35
      taosThreadMutexUnlock(&pVnode->lock);
36 37
      tsem_wait(&pVnode->syncSem);
    } else {
38
      taosThreadMutexUnlock(&pVnode->lock);
39
    }
40
  }
M
Minghao Li 已提交
41 42
}

43 44
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
45
    const STraceId *trace = &pMsg->info.traceId;
46
    taosThreadMutexLock(&pVnode->lock);
47
    if (pVnode->blocked) {
48
      vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
49
      pVnode->blocked = false;
50 51
      tsem_post(&pVnode->syncSem);
    }
52
    taosThreadMutexUnlock(&pVnode->lock);
53 54 55
  }
}

S
Shengliang Guan 已提交
56 57 58 59 60 61 62 63 64 65 66
static int32_t vnodeSetStandBy(SVnode *pVnode) {
  vInfo("vgId:%d, start to set standby", TD_VID(pVnode));

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
    vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
67
  vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode));
S
Shengliang Guan 已提交
68 69 70 71 72 73 74 75 76 77 78
  if (syncLeaderTransfer(pVnode->sync) != 0) {
    vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
    return -1;
  } else {
    vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
  }

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else {
79
    vError("vgId:%d, failed to set standby after leader transfer since %s", TD_VID(pVnode), terrstr());
S
Shengliang Guan 已提交
80 81 82 83
    return -1;
  }
}

S
Shengliang Guan 已提交
84
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
85 86 87
  SAlterVnodeReq req = {0};
  if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
88
    return TSDB_CODE_INVALID_MSG;
89
  }
90 91

  const STraceId *trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
92
  vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
93

94 95 96 97 98 99 100 101
  SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
  for (int32_t r = 0; r < req.replica; ++r) {
    SNodeInfo *pNode = &cfg.nodeInfo[r];
    tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = req.replicas[r].port;
    vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
  }

S
Shengliang Guan 已提交
102 103 104 105 106 107
  SRpcMsg rpcMsg = {.info = pMsg->info};
  if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) {
    vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
108 109
  int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
  if (code != 0) {
S
Shengliang Guan 已提交
110 111 112 113 114 115 116 117 118
    if (terrno != 0) code = terrno;

    vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
    if (terrno == TSDB_CODE_SYN_IS_LEADER) {
      if (syncLeaderTransfer(pVnode->sync) != 0) {
        vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
      } else {
        vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
      }
S
Shengliang Guan 已提交
119 120 121
    }
  }

S
Shengliang Guan 已提交
122
  terrno = code;
S
Shengliang Guan 已提交
123
  return code;
124 125
}

126 127 128 129 130 131 132 133 134 135 136 137 138
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

D
dapan1121 已提交
139
  SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1};
140 141 142
  tmsgSendRedirectRsp(&rsp, &newEpSet);
}

143 144 145 146 147 148 149 150 151
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
152 153 154 155
  } else {
    if (rsp.pCont) {
      rpcFreeCont(rsp.pCont);
    }
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
  if (code == TSDB_CODE_SYN_NOT_LEADER) {
    vnodeRedirectRpcMsg(pVnode, pMsg);
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = vnodeProcessAlterReplicaReq(pVnode, pMsg);

  if (code > 0) {
    ASSERT(0);
  } else if (code == 0) {
    vnodeWaitBlockMsg(pVnode, pMsg);
  } else {
    if (terrno != 0) code = terrno;
    vnodeHandleProposeError(pVnode, pMsg, code);
  }

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
}

190
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
191 192 193
  if (*arrSize <= 0) return;

#if BATCH_DISABLE
194
  int32_t code = syncPropose(pVnode->sync, pMsgArr[0], pIsWeakArr[0]);
195 196 197 198 199 200
#else
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
#endif

  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
201
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
202 203
    }
  } else if (code == 0) {
204
    vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]);
205 206 207
  } else {
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
208
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
209 210 211 212
    }
  }

  for (int32_t i = 0; i < *arrSize; ++i) {
213 214 215
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
216
    rpcFreeCont(pMsg->pCont);
217
    taosFreeQitem(pMsg);
218 219 220 221 222
  }

  *arrSize = 0;
}

223
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
224 225 226 227 228 229 230
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
231 232
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

233
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
234
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
235 236 237
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

238
    const STraceId *trace = &pMsg->info.traceId;
239 240 241
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

242
    if (!pVnode->restored) {
243
      vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
244 245 246 247 248 249 250
      terrno = TSDB_CODE_APP_NOT_READY;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

251 252
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
      vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
253 254
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
255 256 257 258
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
259

260
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
261
    if (code != 0) {
262 263 264 265
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
266 267
    }

268 269 270 271 272 273 274 275 276
    if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
      vnodeHandleAlterReplicaReq(pVnode, pMsg);
      continue;
    }

    if (isBlock || BATCH_DISABLE) {
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

277
    pMsgArr[arrayPos] = pMsg;
278 279 280 281 282
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

    if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) {
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
283
    }
284
  }
285 286 287

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
288 289
}

290
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
291
  SVnode  *pVnode = pInfo->ahandle;
292 293 294 295 296 297
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
298
    const STraceId *trace = &pMsg->info.traceId;
299 300
    vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
            TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
301

S
Shengliang Guan 已提交
302 303
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
304
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
305
        rsp.code = terrno;
306 307
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
308 309
      }
    }
310

311
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
312
    if (rsp.info.handle != NULL) {
313
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
314 315 316 317
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
318 319
    }

320
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
321 322
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
323
  }
M
Minghao Li 已提交
324 325
}

326 327 328 329 330
int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
  int32_t         code = 0;
  const STraceId *trace = &pMsg->info.traceId;

  if (!syncEnvIsStart()) {
331
    vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
332 333 334 335 336 337
    terrno = TSDB_CODE_APP_ERROR;
    return -1;
  }

  SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
  if (pSyncNode == NULL) {
338
    vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId, pMsg);
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
    SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
    syncHeartbeatDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
    SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
    syncHeartbeatReplyDestroy(pSyncMsg);

  } else {
356
    vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
357 358 359 360 361 362 363 364 365 366 367 368
    code = -1;
  }

  vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
         code);
  syncNodeRelease(pSyncNode);
  if (code != 0 && terrno == 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
  return code;
}

369
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
370 371
  int32_t         code = 0;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
372

373
  if (!syncEnvIsStart()) {
S
Shengliang Guan 已提交
374
    vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
375 376 377
    terrno = TSDB_CODE_APP_ERROR;
    return -1;
  }
S
Shengliang Guan 已提交
378

379 380
  SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
381
    vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId, pMsg);
382 383 384
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
S
Shengliang Guan 已提交
385

386
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408

  if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
      syncClientRequestDestroy(pSyncMsg);
409 410 411 412 413
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
      SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
      syncClientRequestBatchDestroyDeep(pSyncMsg);
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
      code = vnodeSetStandBy(pVnode);
      if (code != 0 && terrno != 0) code = terrno;
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
439
    } else {
S
Shengliang Guan 已提交
440
      vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
441
      code = -1;
S
Shengliang Guan 已提交
442 443
    }

444
  } else if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_WAL_FIRST) {
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
    // use wal first strategy
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
      syncClientRequestDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
      SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
M
Minghao Li 已提交
470
      syncClientRequestBatchDestroy(pSyncMsg);
471 472 473
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
474
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
475 476 477 478
      syncRequestVoteDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
479
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
480 481 482 483 484 485 486 487 488 489 490
      syncRequestVoteReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
      SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg);
      syncAppendEntriesBatchDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
491 492 493 494 495 496 497 498
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
499 500 501 502 503 504
    } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
      code = vnodeSetStandBy(pVnode);
      if (code != 0 && terrno != 0) code = terrno;
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
    } else {
S
Shengliang Guan 已提交
505
      vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
506 507
      code = -1;
    }
S
Shengliang Guan 已提交
508 509
  }

510 511
  vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
         code);
512 513
  syncNodeRelease(pSyncNode);
  if (code != 0 && terrno == 0) {
S
Shengliang Guan 已提交
514 515
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
516
  return code;
S
Shengliang Guan 已提交
517 518
}

519
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
M
Minghao Li 已提交
520 521 522 523
  if (msgcb == NULL) {
    return -1;
  }

M
Minghao Li 已提交
524 525 526
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
527
    pMsg->pCont = NULL;
M
Minghao Li 已提交
528 529 530
  }
  return code;
}
M
Minghao Li 已提交
531

532
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
533 534 535 536 537 538 539
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
540

541
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
542
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
543 544 545
  return 0;
}

S
Shengliang Guan 已提交
546
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {
547 548
  SVnode *pVnode = pFsm->data;

549
  SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
S
Shengliang Guan 已提交
550 551
  syncGetAndDelRespRpc(pVnode->sync, cbMeta->newCfgSeqNum, &rpcMsg.info);
  rpcMsg.info.conn.applyIndex = cbMeta->index;
552

553
  const STraceId *trace = (STraceId *)&pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
554
  vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
S
Shengliang Guan 已提交
555
          TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta->seqNum, rpcMsg.info.handle);
S
Shengliang Guan 已提交
556 557 558 559
  if (rpcMsg.info.handle != NULL) {
    tmsgSendRsp(&rpcMsg);
  }

560
  vnodePostBlockMsg(pVnode, pMsg);
561 562
}

563
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
564 565 566 567 568 569 570 571 572 573
  if (cbMeta.isWeak == 0) {
    SVnode *pVnode = pFsm->data;

    if (cbMeta.code == 0) {
      SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
      rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
      memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
      syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
      rpcMsg.info.conn.applyIndex = cbMeta.index;
      rpcMsg.info.conn.applyTerm = cbMeta.term;
M
Minghao Li 已提交
574

M
Minghao Li 已提交
575
      vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
S
Shengliang Guan 已提交
576
            ", weak:%d, code:%d, state:%d %s, type:%s",
M
Minghao Li 已提交
577
            syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak,
S
Shengliang Guan 已提交
578
            cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
579

580 581 582
      tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
    } else {
      SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
S
Shengliang Guan 已提交
583 584
      vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync),
             TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code));
585 586 587
      if (rsp.info.handle != NULL) {
        tmsgSendRsp(&rsp);
      }
M
Minghao Li 已提交
588 589
    }
  }
M
Minghao Li 已提交
590 591
}

592
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
593 594
  if (cbMeta.isWeak == 1) {
    SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
595
    vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
596
           syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
S
Shengliang Guan 已提交
597
           syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
598 599 600 601 602 603 604 605 606 607 608

    if (cbMeta.code == 0) {
      SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
      rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
      memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
      syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
      rpcMsg.info.conn.applyIndex = cbMeta.index;
      rpcMsg.info.conn.applyTerm = cbMeta.term;
      tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
    } else {
      SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
S
Shengliang Guan 已提交
609 610
      vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync),
             TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
611 612 613
      if (rsp.info.handle != NULL) {
        tmsgSendRsp(&rsp);
      }
614 615
    }
  }
M
Minghao Li 已提交
616 617
}

618
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
619
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
620
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
621
         syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
S
Shengliang Guan 已提交
622
         syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
623 624
}

M
Minghao Li 已提交
625 626
#define USE_TSDB_SNAPSHOT

627
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
M
Minghao Li 已提交
628
#ifdef USE_TSDB_SNAPSHOT
629 630
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
631
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
632
  return code;
M
Minghao Li 已提交
633 634 635 636
#else
  *ppReader = taosMemoryMalloc(32);
  return 0;
#endif
637
}
S
Shengliang Guan 已提交
638

639
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
M
Minghao Li 已提交
640
#ifdef USE_TSDB_SNAPSHOT
641
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
642
  int32_t code = vnodeSnapReaderClose(pReader);
643
  return code;
M
Minghao Li 已提交
644 645 646 647
#else
  taosMemoryFree(pReader);
  return 0;
#endif
648
}
S
Shengliang Guan 已提交
649

650
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
M
Minghao Li 已提交
651
#ifdef USE_TSDB_SNAPSHOT
652
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
653
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
654
  return code;
M
Minghao Li 已提交
655 656 657 658 659 660 661 662 663 664 665 666
#else
  static int32_t times = 0;
  if (times++ < 5) {
    *len = 64;
    *ppBuf = taosMemoryMalloc(*len);
    snprintf(*ppBuf, *len, "snapshot block %d", times);
  } else {
    *len = 0;
    *ppBuf = NULL;
  }
  return 0;
#endif
667
}
S
Shengliang Guan 已提交
668

M
Minghao Li 已提交
669
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
670
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
671 672
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
673 674 675 676

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    if (itemSize == 0) {
S
Shengliang Guan 已提交
677
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
678 679
      break;
    } else {
S
Shengliang Guan 已提交
680
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
681 682 683 684
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
685 686
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
M
Minghao Li 已提交
687 688 689 690
#else
  *ppWriter = taosMemoryMalloc(32);
  return 0;
#endif
M
Minghao Li 已提交
691
}
S
Shengliang Guan 已提交
692

693
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
694
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
695
  SVnode *pVnode = pFsm->data;
696
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
697
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
698

699
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
700
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
701
  return code;
M
Minghao Li 已提交
702 703 704 705
#else
  taosMemoryFree(pWriter);
  return 0;
#endif
M
Minghao Li 已提交
706
}
S
Shengliang Guan 已提交
707

M
Minghao Li 已提交
708
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
709
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
710
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
711
  vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
712
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
S
Shengliang Guan 已提交
713
  vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
714
  return code;
M
Minghao Li 已提交
715 716 717
#else
  return 0;
#endif
M
Minghao Li 已提交
718
}
S
Shengliang Guan 已提交
719

720 721 722 723
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
  SVnode *pVnode = pFsm->data;
}

724 725
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
726 727 728 729 730 731 732

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    if (itemSize == 0) {
      vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId);
      break;
    } else {
S
Shengliang Guan 已提交
733
      vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize);
734 735 736 737
      taosMsleep(10);
    }
  } while (true);

738 739
  walApplyVer(pVnode->pWal, pVnode->state.applied);

740 741 742 743
  pVnode->restored = true;
  vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
}

744 745 746 747 748
static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become follower", pVnode->config.vgId);

  // clear old leader resource
749
  taosThreadMutexLock(&pVnode->lock);
750 751
  if (pVnode->blocked) {
    pVnode->blocked = false;
752
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
753 754
    tsem_post(&pVnode->syncSem);
  }
755
  taosThreadMutexUnlock(&pVnode->lock);
756 757 758 759 760
}

static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
761

762 763 764 765 766 767
  // taosThreadMutexLock(&pVnode->lock);
  // if (pVnode->blocked) {
  //   pVnode->blocked = false;
  //   tsem_post(&pVnode->syncSem);
  // }
  // taosThreadMutexUnlock(&pVnode->lock);
768 769
}

770
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
771
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
772
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
773 774 775
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
776
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
777
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
778
  pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
779 780
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
781
  pFsm->FpReConfigCb = vnodeSyncReconfig;
S
Shengliang Guan 已提交
782 783 784 785 786 787 788
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
789
  return pFsm;
790 791 792 793
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
794 795
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
      //.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT,
M
Minghao Li 已提交
796
      .batchSize = 1,
797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814
      .vgId = pVnode->config.vgId,
      .isStandBy = pVnode->config.standby,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
      .msgcb = NULL,
      .FpSendMsg = vnodeSyncSendMsg,
      .FpEqMsg = vnodeSyncEqMsg,
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

M
Minghao Li 已提交
815
  setPingTimerMS(pVnode->sync, 5000);
816 817
  setElectTimerMS(pVnode->sync, 4000);
  setHeartbeatTimerMS(pVnode->sync, 700);
818 819 820 821 822
  return 0;
}

void vnodeSyncStart(SVnode *pVnode) {
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
M
Minghao Li 已提交
823
  syncStart(pVnode->sync);
824 825 826
}

void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
827

828 829
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }

830 831
bool vnodeIsLeader(SVnode *pVnode) {
  if (!syncIsReady(pVnode->sync)) {
M
Minghao Li 已提交
832 833
    vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),
           syncRestoreFinish(pVnode->sync));
834 835 836
    return false;
  }

837
  if (!pVnode->restored) {
838
    vDebug("vgId:%d, vnode not restored", pVnode->config.vgId);
839 840 841
    terrno = TSDB_CODE_APP_NOT_READY;
    return false;
  }
842 843

  return true;
L
Liu Jicong 已提交
844
}