vnodeSync.c 16.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19 20 21
static inline bool vnodeIsMsgBlock(tmsg_t type) {
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_ALTER_REPLICA);
}
M
Minghao Li 已提交
22

23
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
24

25 26
static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) {
  if (!vnodeIsMsgBlock(type)) return;
M
Minghao Li 已提交
27

28 29 30
  int32_t count = atomic_add_fetch_32(&pVnode->syncCount, 1);
  vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
}
M
Minghao Li 已提交
31

32 33 34 35 36 37
static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
  int32_t count = atomic_load_32(&pVnode->syncCount);
  if (count <= 0) return;

  vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count);
  tsem_wait(&pVnode->syncSem);
M
Minghao Li 已提交
38 39
}

40 41 42 43 44 45 46 47 48 49 50 51 52
static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
  if (!vnodeIsMsgBlock(type)) return;

  int32_t count = atomic_load_32(&pVnode->syncCount);
  if (count <= 0) return;

  count = atomic_sub_fetch_32(&pVnode->syncCount, 1);
  vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
  if (count <= 0) {
    tsem_post(&pVnode->syncSem);
  }
}

S
Shengliang Guan 已提交
53 54 55 56 57 58 59 60 61 62 63
static int32_t vnodeSetStandBy(SVnode *pVnode) {
  vInfo("vgId:%d, start to set standby", TD_VID(pVnode));

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
    vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
64
  vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode));
S
Shengliang Guan 已提交
65 66 67 68 69 70 71 72 73 74 75
  if (syncLeaderTransfer(pVnode->sync) != 0) {
    vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
    return -1;
  } else {
    vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
  }

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else {
76
    vError("vgId:%d, failed to set standby after leader transfer since %s", TD_VID(pVnode), terrstr());
S
Shengliang Guan 已提交
77 78 79 80
    return -1;
  }
}

S
Shengliang Guan 已提交
81
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
82 83 84
  SAlterVnodeReq req = {0};
  if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
85
    return TSDB_CODE_INVALID_MSG;
86
  }
dengyihao's avatar
dengyihao 已提交
87 88
  STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
89 90 91 92 93 94 95 96
  SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
  for (int32_t r = 0; r < req.replica; ++r) {
    SNodeInfo *pNode = &cfg.nodeInfo[r];
    tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = req.replicas[r].port;
    vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
  }

S
Shengliang Guan 已提交
97 98 99 100 101 102
  SRpcMsg rpcMsg = {.info = pMsg->info};
  if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) {
    vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
103 104
  int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
  if (code != 0) {
S
Shengliang Guan 已提交
105 106 107 108 109 110 111 112 113
    if (terrno != 0) code = terrno;

    vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
    if (terrno == TSDB_CODE_SYN_IS_LEADER) {
      if (syncLeaderTransfer(pVnode->sync) != 0) {
        vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
      } else {
        vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
      }
S
Shengliang Guan 已提交
114 115 116
    }
  }

S
Shengliang Guan 已提交
117
  terrno = code;
S
Shengliang Guan 已提交
118
  return code;
119 120 121
}

void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
122
  SVnode  *pVnode = pInfo->ahandle;
123 124 125 126 127 128
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t m = 0; m < numOfMsgs; m++) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
dengyihao's avatar
dengyihao 已提交
129 130
    STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
131 132

    if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
S
Shengliang Guan 已提交
133
      code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
134 135 136 137 138 139
    } else {
      code = vnodePreprocessReq(pVnode, pMsg);
      if (code != 0) {
        vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr());
      } else {
        code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
        if (code == 1) {
          do {
            static int32_t cnt = 0;
            if (cnt++ % 1000 == 1) {
              vInfo("vgId:%d, msg:%p apply right now, apply index:%ld, msgtype:%s,%d", vgId, pMsg,
                    pMsg->info.conn.applyIndex, TMSG_INFO(pMsg->msgType), pMsg->msgType);
            }
          } while (0);

          SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
          if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
            rsp.code = terrno;
            vInfo("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
          }

          if (rsp.info.handle != NULL) {
            tmsgSendRsp(&rsp);
          }
        }
159 160 161 162 163
      }
    }

    if (code == 0) {
      vnodeAccumBlockMsg(pVnode, pMsg->msgType);
M
Minghao Li 已提交
164
    } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
165
      SEpSet newEpSet = {0};
166 167 168
      syncGetRetryEpSet(pVnode->sync, &newEpSet);

      /*
169 170 171 172 173
      syncGetEpSet(pVnode->sync, &newEpSet);
      SEp *pEp = &newEpSet.eps[newEpSet.inUse];
      if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
        newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
      }
174
      */
175

dengyihao's avatar
dengyihao 已提交
176 177
      vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
              newEpSet.inUse);
178
      for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
dengyihao's avatar
dengyihao 已提交
179
        vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
180
      }
dengyihao's avatar
dengyihao 已提交
181
      pMsg->info.hasEpSet = 1;
182 183 184
      SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
      tmsgSendRedirectRsp(&rsp, &newEpSet);
    } else {
185 186 187 188 189 190
      if (code != 1) {
        if (terrno != 0) code = terrno;
        vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
        SRpcMsg rsp = {.code = code, .info = pMsg->info};
        tmsgSendRsp(&rsp);
      }
191 192
    }

dengyihao's avatar
dengyihao 已提交
193
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
194 195
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
196
  }
S
Shengliang Guan 已提交
197

198
  vnodeWaitBlockMsg(pVnode);
199 200
}

201
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
202
  SVnode  *pVnode = pInfo->ahandle;
203 204 205 206 207 208
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
dengyihao's avatar
dengyihao 已提交
209 210 211
    STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType),
            pMsg->info.handle);
212

S
Shengliang Guan 已提交
213 214
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
215
      if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
216 217 218 219
        rsp.code = terrno;
        vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
      }
    }
220

S
Shengliang Guan 已提交
221 222
    vnodePostBlockMsg(pVnode, pMsg->msgType);
    if (rsp.info.handle != NULL) {
223 224 225
      tmsgSendRsp(&rsp);
    }

dengyihao's avatar
dengyihao 已提交
226
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code);
227 228
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
229
  }
M
Minghao Li 已提交
230 231
}

S
Shengliang Guan 已提交
232 233 234 235 236 237 238 239
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
  int32_t ret = 0;

  if (syncEnvIsStart()) {
    SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
    assert(pSyncNode != NULL);

    SMsgHead *pHead = pMsg->pCont;
M
Minghao Li 已提交
240
    STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
241

M
Minghao Li 已提交
242
    do {
243
      char          *syncNodeStr = sync2SimpleStr(pVnode->sync);
M
Minghao Li 已提交
244 245
      static int64_t vndTick = 0;
      if (++vndTick % 10 == 1) {
M
Minghao Li 已提交
246
        vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
M
Minghao Li 已提交
247 248 249 250 251 252 253 254 255
      }
      if (gRaftDetailLog) {
        char logBuf[512] = {0};
        snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType,
                 syncNodeStr);
        syncRpcMsgLog2(logBuf, pMsg);
      }
      taosMemoryFree(syncNodeStr);
    } while (0);
S
Shengliang Guan 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283

    SRpcMsg *pRpcMsg = pMsg;

    if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

284
      ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
S
Shengliang Guan 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
      syncClientRequestDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
      assert(pSyncMsg != NULL);

      ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);

    } else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
      ret = vnodeSetStandBy(pVnode);
      if (ret != 0 && terrno != 0) ret = terrno;
      SRpcMsg rsp = {.code = ret, .info = pMsg->info};
      tmsgSendRsp(&rsp);
    } else {
      vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
      ret = -1;
    }

    syncNodeRelease(pSyncNode);
  } else {
    vError("==vnodeProcessSyncReq== error syncEnv stop");
    ret = -1;
  }

S
Shengliang Guan 已提交
331
  if (ret != 0 && terrno == 0) {
S
Shengliang Guan 已提交
332 333 334 335 336
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
  return ret;
}

337
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
M
Minghao Li 已提交
338 339 340
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
341
    pMsg->pCont = NULL;
M
Minghao Li 已提交
342 343 344
  }
  return code;
}
M
Minghao Li 已提交
345

346
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
347 348 349 350 351 352 353
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
354

355
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
356
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
357 358 359
  return 0;
}

360
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
361 362
  SVnode *pVnode = pFsm->data;

363
  SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
364
  syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info);
365
  rpcMsg.info.conn.applyIndex = cbMeta.index;
366

dengyihao's avatar
dengyihao 已提交
367 368 369
  STraceId *trace = (STraceId *)&pMsg->info.traceId;
  vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
          TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
S
Shengliang Guan 已提交
370 371 372 373
  if (rpcMsg.info.handle != NULL) {
    tmsgSendRsp(&rpcMsg);
  }

374
  vnodePostBlockMsg(pVnode, TDMT_VND_ALTER_REPLICA);
375 376
}

377
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
378
  SVnode   *pVnode = pFsm->data;
S
Shengliang Guan 已提交
379
  SSnapshot snapshot = {0};
M
Minghao Li 已提交
380
  SyncIndex beginIndex = SYNC_INDEX_INVALID;
S
Shengliang Guan 已提交
381 382
  char      logBuf[256] = {0};

383 384 385 386 387
  snprintf(logBuf, sizeof(logBuf),
           "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
           beginIndex);
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
M
Minghao Li 已提交
388

389 390 391 392 393 394
  SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
  syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
  rpcMsg.info.conn.applyIndex = cbMeta.index;
  tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
M
Minghao Li 已提交
395 396
}

397
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
398
  char logBuf[256] = {0};
M
Minghao Li 已提交
399 400 401 402 403 404
  snprintf(logBuf, sizeof(logBuf),
           "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
           cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}

405
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
406
  char logBuf[256] = {0};
M
Minghao Li 已提交
407 408 409 410 411
  snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}

412
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { return 0; }
S
Shengliang Guan 已提交
413

S
Shengliang Guan 已提交
414
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
S
Shengliang Guan 已提交
415

S
Shengliang Guan 已提交
416
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
S
Shengliang Guan 已提交
417

S
Shengliang Guan 已提交
418
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
S
Shengliang Guan 已提交
419

S
Shengliang Guan 已提交
420
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
S
Shengliang Guan 已提交
421

S
Shengliang Guan 已提交
422
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; }
S
Shengliang Guan 已提交
423

424
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
425
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
426
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
427 428 429
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
430
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
431
  pFsm->FpRestoreFinishCb = NULL;
432
  pFsm->FpReConfigCb = vnodeSyncReconfig;
S
Shengliang Guan 已提交
433 434 435 436 437 438 439
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
440
  return pFsm;
441 442 443 444
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
445
      .snapshotEnable = false,
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
      .vgId = pVnode->config.vgId,
      .isStandBy = pVnode->config.standby,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
      .msgcb = NULL,
      .FpSendMsg = vnodeSyncSendMsg,
      .FpEqMsg = vnodeSyncEqMsg,
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

  setPingTimerMS(pVnode->sync, 3000);
  setElectTimerMS(pVnode->sync, 500);
  setHeartbeatTimerMS(pVnode->sync, 100);
  return 0;
}

void vnodeSyncStart(SVnode *pVnode) {
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
M
Minghao Li 已提交
472
  syncStart(pVnode->sync);
473 474 475
}

void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }