vnodeSync.c 21.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19 20 21
static inline bool vnodeIsMsgBlock(tmsg_t type) {
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_ALTER_REPLICA);
}
M
Minghao Li 已提交
22

23
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
24

25 26
static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) {
  if (!vnodeIsMsgBlock(type)) return;
M
Minghao Li 已提交
27

28
  int32_t count = atomic_add_fetch_32(&pVnode->blockCount, 1);
29 30
  vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
}
M
Minghao Li 已提交
31

32
static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
33
  int32_t count = atomic_load_32(&pVnode->blockCount);
34 35 36 37
  if (count <= 0) return;

  vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count);
  tsem_wait(&pVnode->syncSem);
M
Minghao Li 已提交
38 39
}

40 41 42
static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
  if (!vnodeIsMsgBlock(type)) return;

43
  int32_t count = atomic_load_32(&pVnode->blockCount);
44 45
  if (count <= 0) return;

46
  count = atomic_sub_fetch_32(&pVnode->blockCount, 1);
47 48 49 50 51 52
  vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
  if (count <= 0) {
    tsem_post(&pVnode->syncSem);
  }
}

S
Shengliang Guan 已提交
53 54 55 56 57 58 59 60 61 62 63
static int32_t vnodeSetStandBy(SVnode *pVnode) {
  vInfo("vgId:%d, start to set standby", TD_VID(pVnode));

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
    vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
64
  vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode));
S
Shengliang Guan 已提交
65 66 67 68 69 70 71 72 73 74 75
  if (syncLeaderTransfer(pVnode->sync) != 0) {
    vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
    return -1;
  } else {
    vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
  }

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else {
76
    vError("vgId:%d, failed to set standby after leader transfer since %s", TD_VID(pVnode), terrstr());
S
Shengliang Guan 已提交
77 78 79 80
    return -1;
  }
}

S
Shengliang Guan 已提交
81
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
82 83 84
  SAlterVnodeReq req = {0};
  if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
85
    return TSDB_CODE_INVALID_MSG;
86
  }
87 88

  const STraceId *trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
89
  vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
90

91 92 93 94 95 96 97 98
  SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
  for (int32_t r = 0; r < req.replica; ++r) {
    SNodeInfo *pNode = &cfg.nodeInfo[r];
    tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = req.replicas[r].port;
    vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
  }

S
Shengliang Guan 已提交
99 100 101 102 103 104
  SRpcMsg rpcMsg = {.info = pMsg->info};
  if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) {
    vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
105 106
  int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
  if (code != 0) {
S
Shengliang Guan 已提交
107 108 109 110 111 112 113 114 115
    if (terrno != 0) code = terrno;

    vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
    if (terrno == TSDB_CODE_SYN_IS_LEADER) {
      if (syncLeaderTransfer(pVnode->sync) != 0) {
        vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
      } else {
        vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
      }
S
Shengliang Guan 已提交
116 117 118
    }
  }

S
Shengliang Guan 已提交
119
  terrno = code;
S
Shengliang Guan 已提交
120
  return code;
121 122
}

123 124 125 126 127 128 129 130 131 132 133 134 135
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

D
dapan1121 已提交
136
  SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1};
137 138 139 140
  tmsgSendRedirectRsp(&rsp, &newEpSet);
}

void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
141
  SVnode  *pVnode = pInfo->ahandle;
142 143 144 145 146 147
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t m = 0; m < numOfMsgs; m++) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
148
    const STraceId *trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
149
    vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
150

151
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
152 153
    if (code != 0) {
      vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
154
    } else {
155 156
      if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
        code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
157 158
      } else {
        code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
159
        if (code > 0) {
160
          SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
161
          if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
162
            rsp.code = terrno;
163
            vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
164 165 166 167 168
          }
          if (rsp.info.handle != NULL) {
            tmsgSendRsp(&rsp);
          }
        }
169 170 171 172 173
      }
    }

    if (code == 0) {
      vnodeAccumBlockMsg(pVnode, pMsg->msgType);
174 175
    } else if (code < 0) {
      if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
176
        vnodeRedirectRpcMsg(pVnode, pMsg);
177
      } else {
178 179 180
        if (terrno != 0) code = terrno;
        vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
        SRpcMsg rsp = {.code = code, .info = pMsg->info};
181 182 183
        if (rsp.info.handle != NULL) {
          tmsgSendRsp(&rsp);
        }
184
      }
185
    } else {
186 187
    }

dengyihao's avatar
dengyihao 已提交
188
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
189 190
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
191
  }
S
Shengliang Guan 已提交
192

193
  vnodeWaitBlockMsg(pVnode);
194 195
}

196
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
197
  SVnode  *pVnode = pInfo->ahandle;
198 199 200 201 202 203
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
204
    const STraceId *trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
205 206
    vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType),
            pMsg->info.handle);
207

S
Shengliang Guan 已提交
208 209
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
210
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
211 212 213 214
        rsp.code = terrno;
        vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
      }
    }
215

S
Shengliang Guan 已提交
216 217
    vnodePostBlockMsg(pVnode, pMsg->msgType);
    if (rsp.info.handle != NULL) {
218 219 220
      tmsgSendRsp(&rsp);
    }

dengyihao's avatar
dengyihao 已提交
221
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code);
222 223
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
224
  }
M
Minghao Li 已提交
225 226
}

227
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
228 229
  int32_t         code = 0;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
230

231 232 233 234 235
  if (!syncEnvIsStart()) {
    vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId);
    terrno = TSDB_CODE_APP_ERROR;
    return -1;
  }
S
Shengliang Guan 已提交
236

237 238 239 240 241 242
  SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
  if (pSyncNode == NULL) {
    vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
S
Shengliang Guan 已提交
243

244
#if 1
245 246 247 248 249 250 251 252
  do {
    char          *syncNodeStr = sync2SimpleStr(pVnode->sync);
    static int64_t vndTick = 0;
    if (++vndTick % 10 == 1) {
      vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
    }
    taosMemoryFree(syncNodeStr);
  } while (0);
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
#endif

  if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
      syncClientRequestDestroy(pSyncMsg);
276 277 278 279 280
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
      SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
      syncClientRequestBatchDestroyDeep(pSyncMsg);
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
      code = vnodeSetStandBy(pVnode);
      if (code != 0 && terrno != 0) code = terrno;
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
306
    } else {
307 308
      vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
      code = -1;
S
Shengliang Guan 已提交
309 310
    }

311
  } else if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_WAL_FIRST) {
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
    // use wal first strategy
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
      syncClientRequestDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
      SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
      syncClientRequestBatchDestroyDeep(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
341
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
342 343 344 345
      syncRequestVoteDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
346
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
347 348 349 350 351 352 353 354 355 356 357
      syncRequestVoteReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
      SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg);
      syncAppendEntriesBatchDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
358 359 360 361 362 363 364 365
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
366 367 368 369 370 371 372 373 374
    } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
      code = vnodeSetStandBy(pVnode);
      if (code != 0 && terrno != 0) code = terrno;
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
    } else {
      vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
      code = -1;
    }
S
Shengliang Guan 已提交
375 376
  }

377 378
  syncNodeRelease(pSyncNode);
  if (code != 0 && terrno == 0) {
S
Shengliang Guan 已提交
379 380
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
381
  return code;
S
Shengliang Guan 已提交
382 383
}

384
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
M
Minghao Li 已提交
385 386 387
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
388
    pMsg->pCont = NULL;
M
Minghao Li 已提交
389 390 391
  }
  return code;
}
M
Minghao Li 已提交
392

393
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
394 395 396 397 398 399 400
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
401

402
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
403
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
404 405 406
  return 0;
}

407
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
408 409
  SVnode *pVnode = pFsm->data;

410
  SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
411
  syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info);
412
  rpcMsg.info.conn.applyIndex = cbMeta.index;
413

414
  const STraceId *trace = (STraceId *)&pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
415 416
  vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
          TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
S
Shengliang Guan 已提交
417 418 419 420
  if (rpcMsg.info.handle != NULL) {
    tmsgSendRsp(&rpcMsg);
  }

421
  vnodePostBlockMsg(pVnode, TDMT_VND_ALTER_REPLICA);
422 423
}

424
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
425
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
426
  vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
427 428
         syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
         syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
429

430 431 432 433 434
  SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
  syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
  rpcMsg.info.conn.applyIndex = cbMeta.index;
M
Minghao Li 已提交
435
  rpcMsg.info.conn.applyTerm = cbMeta.term;
436
  tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
M
Minghao Li 已提交
437 438
}

439
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
440
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
441
  vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
442 443
         syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
         syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
444 445
}

446
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
447
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
448
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
449 450
         syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
         syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
451 452
}

453 454 455
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
456
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
457 458
  return code;
}
S
Shengliang Guan 已提交
459

460 461
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
462
  int32_t code = vnodeSnapReaderClose(pReader);
463 464
  return code;
}
S
Shengliang Guan 已提交
465

466 467
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
468
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
469 470
  return code;
}
S
Shengliang Guan 已提交
471

M
Minghao Li 已提交
472 473 474 475 476 477
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
}
S
Shengliang Guan 已提交
478

M
Minghao Li 已提交
479 480
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
  SVnode *pVnode = pFsm->data;
481
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply);
M
Minghao Li 已提交
482 483
  return code;
}
S
Shengliang Guan 已提交
484

M
Minghao Li 已提交
485 486 487 488 489
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
  SVnode *pVnode = pFsm->data;
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
  return code;
}
S
Shengliang Guan 已提交
490

491
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
492
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
493
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
494 495 496
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
497
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
498
  pFsm->FpRestoreFinishCb = NULL;
499
  pFsm->FpReConfigCb = vnodeSyncReconfig;
S
Shengliang Guan 已提交
500 501 502 503 504 505 506
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
507
  return pFsm;
508 509 510 511
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
M
Minghao Li 已提交
512 513
      .snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT,
      .batchSize = 10,
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
      .vgId = pVnode->config.vgId,
      .isStandBy = pVnode->config.standby,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
      .msgcb = NULL,
      .FpSendMsg = vnodeSyncSendMsg,
      .FpEqMsg = vnodeSyncEqMsg,
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

  setPingTimerMS(pVnode->sync, 3000);
  setElectTimerMS(pVnode->sync, 500);
  setHeartbeatTimerMS(pVnode->sync, 100);
  return 0;
}

void vnodeSyncStart(SVnode *pVnode) {
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
M
Minghao Li 已提交
540
  syncStart(pVnode->sync);
541 542 543
}

void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
544 545 546 547 548 549 550 551 552 553 554 555 556 557

bool vnodeIsLeader(SVnode *pVnode) {
  if (!syncIsReady(pVnode->sync)) {
    return false;
  }

  // todo
  // if (!pVnode->restored) {
  //   terrno = TSDB_CODE_APP_NOT_READY;
  //   return false;
  // }

  return true;
}