vnodeSync.c 16.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19 20 21
static inline bool vnodeIsMsgBlock(tmsg_t type) {
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_ALTER_REPLICA);
}
M
Minghao Li 已提交
22

23
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
24

25 26
static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) {
  if (!vnodeIsMsgBlock(type)) return;
M
Minghao Li 已提交
27

28 29 30
  int32_t count = atomic_add_fetch_32(&pVnode->syncCount, 1);
  vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
}
M
Minghao Li 已提交
31

32 33 34 35 36 37
static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
  int32_t count = atomic_load_32(&pVnode->syncCount);
  if (count <= 0) return;

  vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count);
  tsem_wait(&pVnode->syncSem);
M
Minghao Li 已提交
38 39
}

40 41 42 43 44 45 46 47 48 49 50 51 52
static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
  if (!vnodeIsMsgBlock(type)) return;

  int32_t count = atomic_load_32(&pVnode->syncCount);
  if (count <= 0) return;

  count = atomic_sub_fetch_32(&pVnode->syncCount, 1);
  vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
  if (count <= 0) {
    tsem_post(&pVnode->syncSem);
  }
}

S
Shengliang Guan 已提交
53 54 55 56 57 58 59 60 61 62 63
static int32_t vnodeSetStandBy(SVnode *pVnode) {
  vInfo("vgId:%d, start to set standby", TD_VID(pVnode));

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
    vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
64
  vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode));
S
Shengliang Guan 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
  if (syncLeaderTransfer(pVnode->sync) != 0) {
    vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
    return -1;
  } else {
    vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
  }

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else {
    vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
    return -1;
  }
}

S
Shengliang Guan 已提交
81
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
82 83 84
  SAlterVnodeReq req = {0};
  if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
85
    return TSDB_CODE_INVALID_MSG;
86
  }
dengyihao's avatar
dengyihao 已提交
87 88
  STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
89 90 91 92 93 94 95 96
  SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
  for (int32_t r = 0; r < req.replica; ++r) {
    SNodeInfo *pNode = &cfg.nodeInfo[r];
    tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = req.replicas[r].port;
    vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
  }

S
Shengliang Guan 已提交
97 98 99 100 101 102
  SRpcMsg rpcMsg = {.info = pMsg->info};
  if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) {
    vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
103 104
  int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
  if (code != 0) {
S
Shengliang Guan 已提交
105 106 107 108 109 110 111 112 113
    if (terrno != 0) code = terrno;

    vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
    if (terrno == TSDB_CODE_SYN_IS_LEADER) {
      if (syncLeaderTransfer(pVnode->sync) != 0) {
        vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
      } else {
        vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
      }
S
Shengliang Guan 已提交
114 115 116
    }
  }

S
Shengliang Guan 已提交
117
  terrno = code;
S
Shengliang Guan 已提交
118
  return code;
119 120 121
}

void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
M
Minghao Li 已提交
122
  SVnode  *pVnode = pInfo->ahandle;
123 124 125 126 127 128
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t m = 0; m < numOfMsgs; m++) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
dengyihao's avatar
dengyihao 已提交
129 130
    STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
131 132

    if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
S
Shengliang Guan 已提交
133
      code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
134 135 136 137 138 139 140 141 142 143 144
    } else {
      code = vnodePreprocessReq(pVnode, pMsg);
      if (code != 0) {
        vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr());
      } else {
        code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
      }
    }

    if (code == 0) {
      vnodeAccumBlockMsg(pVnode, pMsg->msgType);
M
Minghao Li 已提交
145
    } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
146 147 148 149 150 151 152
      SEpSet newEpSet = {0};
      syncGetEpSet(pVnode->sync, &newEpSet);
      SEp *pEp = &newEpSet.eps[newEpSet.inUse];
      if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
        newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
      }

dengyihao's avatar
dengyihao 已提交
153 154
      vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
              newEpSet.inUse);
155
      for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
dengyihao's avatar
dengyihao 已提交
156
        vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
157 158 159 160 161 162 163 164 165 166 167
      }

      SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
      tmsgSendRedirectRsp(&rsp, &newEpSet);
    } else {
      if (terrno != 0) code = terrno;
      vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
    }

dengyihao's avatar
dengyihao 已提交
168
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
169 170
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
171
  }
S
Shengliang Guan 已提交
172

173
  vnodeWaitBlockMsg(pVnode);
174 175
}

176
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
M
Minghao Li 已提交
177
  SVnode  *pVnode = pInfo->ahandle;
178 179 180 181 182 183
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
dengyihao's avatar
dengyihao 已提交
184 185 186
    STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType),
            pMsg->info.handle);
187

S
Shengliang Guan 已提交
188 189
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
190
      if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
191 192 193 194
        rsp.code = terrno;
        vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
      }
    }
195

S
Shengliang Guan 已提交
196 197
    vnodePostBlockMsg(pVnode, pMsg->msgType);
    if (rsp.info.handle != NULL) {
198 199 200
      tmsgSendRsp(&rsp);
    }

dengyihao's avatar
dengyihao 已提交
201
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code);
202 203
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
204
  }
M
Minghao Li 已提交
205 206
}

S
Shengliang Guan 已提交
207 208 209 210 211 212 213 214
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
  int32_t ret = 0;

  if (syncEnvIsStart()) {
    SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
    assert(pSyncNode != NULL);

    SMsgHead *pHead = pMsg->pCont;
M
Minghao Li 已提交
215
    STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
216

M
Minghao Li 已提交
217 218 219 220 221 222 223 224 225 226 227 228 229 230
    do {
      char          *syncNodeStr = sync2SimpleStr(pVnode->sync);
      static int64_t vndTick = 0;
      if (++vndTick % 10 == 1) {
        vGTrace("vgId:%d, sync heartbeat msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
      }
      if (gRaftDetailLog) {
        char logBuf[512] = {0};
        snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType,
                 syncNodeStr);
        syncRpcMsgLog2(logBuf, pMsg);
      }
      taosMemoryFree(syncNodeStr);
    } while (0);
S
Shengliang Guan 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305

    SRpcMsg *pRpcMsg = pMsg;

    if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
      syncClientRequestDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
      ret = vnodeSetStandBy(pVnode);
      if (ret != 0 && terrno != 0) ret = terrno;
      SRpcMsg rsp = {.code = ret, .info = pMsg->info};
      tmsgSendRsp(&rsp);
    } else {
      vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
      ret = -1;
    }

    syncNodeRelease(pSyncNode);
  } else {
    vError("==vnodeProcessSyncReq== error syncEnv stop");
    ret = -1;
  }

S
Shengliang Guan 已提交
306
  if (ret != 0 && terrno == 0) {
S
Shengliang Guan 已提交
307 308 309 310 311
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
  return ret;
}

312
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
M
Minghao Li 已提交
313 314 315
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
316
    pMsg->pCont = NULL;
M
Minghao Li 已提交
317 318 319
  }
  return code;
}
M
Minghao Li 已提交
320

321
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
322 323 324 325 326 327 328
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
329

330
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
331
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
332 333 334
  return 0;
}

335
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
336 337
  SVnode *pVnode = pFsm->data;

338
  SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
339
  syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
340
  rpcMsg.info.conn.applyIndex = cbMeta.index;
341

dengyihao's avatar
dengyihao 已提交
342 343 344
  STraceId *trace = (STraceId *)&pMsg->info.traceId;
  vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
          TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
S
Shengliang Guan 已提交
345 346 347 348
  if (rpcMsg.info.handle != NULL) {
    tmsgSendRsp(&rpcMsg);
  }

349
  vnodePostBlockMsg(pVnode, TDMT_VND_ALTER_REPLICA);
350 351
}

352
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
353
  SVnode   *pVnode = pFsm->data;
S
Shengliang Guan 已提交
354
  SSnapshot snapshot = {0};
M
Minghao Li 已提交
355
  SyncIndex beginIndex = SYNC_INDEX_INVALID;
S
Shengliang Guan 已提交
356 357
  char      logBuf[256] = {0};

358 359
  if (pFsm->FpGetSnapshotInfo != NULL) {
    (*pFsm->FpGetSnapshotInfo)(pFsm, &snapshot);
M
Minghao Li 已提交
360 361 362 363 364 365 366 367 368 369
    beginIndex = snapshot.lastApplyIndex;
  }

  if (cbMeta.index > beginIndex) {
    snprintf(
        logBuf, sizeof(logBuf),
        "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
        pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
    syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);

370
    SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
S
Shengliang Guan 已提交
371 372 373
    rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
    memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
    syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
374
    rpcMsg.info.conn.applyIndex = cbMeta.index;
S
Shengliang Guan 已提交
375
    tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
376

M
Minghao Li 已提交
377
  } else {
378
    char logBuf[256] = {0};
M
Minghao Li 已提交
379 380 381 382 383 384 385 386 387
    snprintf(logBuf, sizeof(logBuf),
             "==callback== ==CommitCb== do not execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, "
             "beginIndex :%ld\n",
             pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
             beginIndex);
    syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
  }
}

388
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
389
  char logBuf[256] = {0};
M
Minghao Li 已提交
390 391 392 393 394 395
  snprintf(logBuf, sizeof(logBuf),
           "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
           cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}

396
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
397
  char logBuf[256] = {0};
M
Minghao Li 已提交
398 399 400 401 402
  snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}

S
Shengliang Guan 已提交
403
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; }
S
Shengliang Guan 已提交
404

S
Shengliang Guan 已提交
405
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
S
Shengliang Guan 已提交
406

S
Shengliang Guan 已提交
407
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
S
Shengliang Guan 已提交
408

S
Shengliang Guan 已提交
409
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
S
Shengliang Guan 已提交
410

S
Shengliang Guan 已提交
411
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
S
Shengliang Guan 已提交
412

S
Shengliang Guan 已提交
413
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; }
S
Shengliang Guan 已提交
414

415
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
416
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
417
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
418 419 420
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
421
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
422
  pFsm->FpRestoreFinishCb = NULL;
423
  pFsm->FpReConfigCb = vnodeSyncReconfig;
S
Shengliang Guan 已提交
424 425 426 427 428 429 430
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
431
  return pFsm;
432 433 434 435
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
436
      .snapshotEnable = false,
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
      .vgId = pVnode->config.vgId,
      .isStandBy = pVnode->config.standby,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
      .msgcb = NULL,
      .FpSendMsg = vnodeSyncSendMsg,
      .FpEqMsg = vnodeSyncEqMsg,
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

  setPingTimerMS(pVnode->sync, 3000);
  setElectTimerMS(pVnode->sync, 500);
  setHeartbeatTimerMS(pVnode->sync, 100);
  return 0;
}

void vnodeSyncStart(SVnode *pVnode) {
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
M
Minghao Li 已提交
463
  syncStart(pVnode->sync);
464 465 466
}

void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }