vnodeSync.c 26.9 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19 20
#define BATCH_DISABLE 1

21
static inline bool vnodeIsMsgBlock(tmsg_t type) {
S
Shengliang Guan 已提交
22 23
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_REPLICA);
24
}
M
Minghao Li 已提交
25

26
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
27

28 29
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
30 31
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
32
    pVnode->blockCount = 1;
33 34
    tsem_wait(&pVnode->syncSem);
  }
M
Minghao Li 已提交
35 36
}

37 38
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
39
    const STraceId *trace = &pMsg->info.traceId;
40 41 42 43 44
    if (pVnode->blockCount) {
      vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
      pVnode->blockCount = 0;
      tsem_post(&pVnode->syncSem);
    }
45 46 47
  }
}

S
Shengliang Guan 已提交
48 49 50 51 52 53 54 55 56 57 58
static int32_t vnodeSetStandBy(SVnode *pVnode) {
  vInfo("vgId:%d, start to set standby", TD_VID(pVnode));

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
    vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
59
  vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode));
S
Shengliang Guan 已提交
60 61 62 63 64 65 66 67 68 69 70
  if (syncLeaderTransfer(pVnode->sync) != 0) {
    vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
    return -1;
  } else {
    vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
  }

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else {
71
    vError("vgId:%d, failed to set standby after leader transfer since %s", TD_VID(pVnode), terrstr());
S
Shengliang Guan 已提交
72 73 74 75
    return -1;
  }
}

S
Shengliang Guan 已提交
76
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
77 78 79
  SAlterVnodeReq req = {0};
  if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
80
    return TSDB_CODE_INVALID_MSG;
81
  }
82 83

  const STraceId *trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
84
  vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
85

86 87 88 89 90 91 92 93
  SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
  for (int32_t r = 0; r < req.replica; ++r) {
    SNodeInfo *pNode = &cfg.nodeInfo[r];
    tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = req.replicas[r].port;
    vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
  }

S
Shengliang Guan 已提交
94 95 96 97 98 99
  SRpcMsg rpcMsg = {.info = pMsg->info};
  if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) {
    vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
100 101
  int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
  if (code != 0) {
S
Shengliang Guan 已提交
102 103 104 105 106 107 108 109 110
    if (terrno != 0) code = terrno;

    vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
    if (terrno == TSDB_CODE_SYN_IS_LEADER) {
      if (syncLeaderTransfer(pVnode->sync) != 0) {
        vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
      } else {
        vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
      }
S
Shengliang Guan 已提交
111 112 113
    }
  }

S
Shengliang Guan 已提交
114
  terrno = code;
S
Shengliang Guan 已提交
115
  return code;
116 117
}

118 119 120 121 122 123 124 125 126 127 128 129 130
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

D
dapan1121 已提交
131
  SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1};
132 133 134
  tmsgSendRedirectRsp(&rsp, &newEpSet);
}

135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
  if (code == TSDB_CODE_SYN_NOT_LEADER) {
    vnodeRedirectRpcMsg(pVnode, pMsg);
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = vnodeProcessAlterReplicaReq(pVnode, pMsg);

  if (code > 0) {
    ASSERT(0);
  } else if (code == 0) {
    vnodeWaitBlockMsg(pVnode, pMsg);
  } else {
    if (terrno != 0) code = terrno;
    vnodeHandleProposeError(pVnode, pMsg, code);
  }

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
}

178
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
179 180 181
  if (*arrSize <= 0) return;

#if BATCH_DISABLE
182
  int32_t code = syncPropose(pVnode->sync, pMsgArr[0], pIsWeakArr[0]);
183 184 185 186 187 188
#else
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
#endif

  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
189
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
190 191
    }
  } else if (code == 0) {
192
    vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]);
193 194 195
  } else {
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
196
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
197 198 199 200
    }
  }

  for (int32_t i = 0; i < *arrSize; ++i) {
201 202 203
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
204
    rpcFreeCont(pMsg->pCont);
205
    taosFreeQitem(pMsg);
206 207 208 209 210
  }

  *arrSize = 0;
}

211
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
212 213 214 215 216 217 218
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
219 220
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

221
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
222
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
223 224 225
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

226
    const STraceId *trace = &pMsg->info.traceId;
227 228 229
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

230 231 232 233 234 235 236 237 238
    if (!pVnode->restored) {
      vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg);
      terrno = TSDB_CODE_APP_NOT_READY;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

239 240
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
      vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
241 242
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
243 244 245 246
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
247

248
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
249
    if (code != 0) {
250 251 252 253
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
254 255
    }

256 257 258 259 260 261 262 263 264
    if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
      vnodeHandleAlterReplicaReq(pVnode, pMsg);
      continue;
    }

    if (isBlock || BATCH_DISABLE) {
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

265
    pMsgArr[arrayPos] = pMsg;
266 267 268 269 270
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

    if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) {
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
271
    }
272
  }
273 274 275

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
276 277
}

278
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
279
  SVnode  *pVnode = pInfo->ahandle;
280 281 282 283 284 285
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
286
    const STraceId *trace = &pMsg->info.traceId;
287 288
    vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
            TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
289

S
Shengliang Guan 已提交
290 291
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
292
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
293
        rsp.code = terrno;
294 295
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
296 297
      }
    }
298

299
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
300
    if (rsp.info.handle != NULL) {
301
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
302 303 304 305
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
306 307
    }

308
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
309 310
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
311
  }
M
Minghao Li 已提交
312 313
}

314
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
315 316
  int32_t         code = 0;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
317

318 319 320 321 322
  if (!syncEnvIsStart()) {
    vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId);
    terrno = TSDB_CODE_APP_ERROR;
    return -1;
  }
S
Shengliang Guan 已提交
323

324 325 326 327 328 329
  SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
  if (pSyncNode == NULL) {
    vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
S
Shengliang Guan 已提交
330

331
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353

  if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
      syncClientRequestDestroy(pSyncMsg);
354 355 356 357 358
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
      SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
      syncClientRequestBatchDestroyDeep(pSyncMsg);
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
      code = vnodeSetStandBy(pVnode);
      if (code != 0 && terrno != 0) code = terrno;
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
384
    } else {
385 386
      vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
      code = -1;
S
Shengliang Guan 已提交
387 388
    }

389
  } else if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_WAL_FIRST) {
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
    // use wal first strategy
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
      syncClientRequestDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
      SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
M
Minghao Li 已提交
415
      syncClientRequestBatchDestroy(pSyncMsg);
416 417 418
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
419
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
420 421 422 423
      syncRequestVoteDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
424
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
425 426 427 428 429 430 431 432 433 434 435
      syncRequestVoteReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
      SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg);
      syncAppendEntriesBatchDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
436 437 438 439 440 441 442 443
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
444 445 446 447 448 449 450 451 452
    } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
      code = vnodeSetStandBy(pVnode);
      if (code != 0 && terrno != 0) code = terrno;
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
    } else {
      vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
      code = -1;
    }
S
Shengliang Guan 已提交
453 454
  }

455 456
  vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
         code);
457 458
  syncNodeRelease(pSyncNode);
  if (code != 0 && terrno == 0) {
S
Shengliang Guan 已提交
459 460
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
461
  return code;
S
Shengliang Guan 已提交
462 463
}

464
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
M
Minghao Li 已提交
465 466 467
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
468
    pMsg->pCont = NULL;
M
Minghao Li 已提交
469 470 471
  }
  return code;
}
M
Minghao Li 已提交
472

473
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
474 475 476 477 478 479 480
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
481

482
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
483
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
484 485 486
  return 0;
}

487
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
488 489
  SVnode *pVnode = pFsm->data;

490
  SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
491
  syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info);
492
  rpcMsg.info.conn.applyIndex = cbMeta.index;
493

494
  const STraceId *trace = (STraceId *)&pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
495 496
  vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
          TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
S
Shengliang Guan 已提交
497 498 499 500
  if (rpcMsg.info.handle != NULL) {
    tmsgSendRsp(&rpcMsg);
  }

501
  vnodePostBlockMsg(pVnode, pMsg);
502 503
}

504
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
505 506 507 508 509 510 511 512 513 514
  if (cbMeta.isWeak == 0) {
    SVnode *pVnode = pFsm->data;

    if (cbMeta.code == 0) {
      SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
      rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
      memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
      syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
      rpcMsg.info.conn.applyIndex = cbMeta.index;
      rpcMsg.info.conn.applyTerm = cbMeta.term;
M
Minghao Li 已提交
515

M
Minghao Li 已提交
516
      vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
S
Shengliang Guan 已提交
517
            ", weak:%d, code:%d, state:%d %s, type:%s",
M
Minghao Li 已提交
518
            syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak,
S
Shengliang Guan 已提交
519
            cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
520

521 522 523
      tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
    } else {
      SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
S
Shengliang Guan 已提交
524 525
      vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync),
             TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code));
526 527 528
      if (rsp.info.handle != NULL) {
        tmsgSendRsp(&rsp);
      }
M
Minghao Li 已提交
529 530
    }
  }
M
Minghao Li 已提交
531 532
}

533
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
534 535
  if (cbMeta.isWeak == 1) {
    SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
536
    vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
537
           syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
S
Shengliang Guan 已提交
538
           syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
539 540 541 542 543 544 545 546 547 548 549

    if (cbMeta.code == 0) {
      SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
      rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
      memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
      syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
      rpcMsg.info.conn.applyIndex = cbMeta.index;
      rpcMsg.info.conn.applyTerm = cbMeta.term;
      tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
    } else {
      SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
S
Shengliang Guan 已提交
550 551
      vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync),
             TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
552 553 554
      if (rsp.info.handle != NULL) {
        tmsgSendRsp(&rsp);
      }
555 556
    }
  }
M
Minghao Li 已提交
557 558
}

559
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
560
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
561
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
562
         syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
S
Shengliang Guan 已提交
563
         syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
564 565
}

M
Minghao Li 已提交
566 567
#define USE_TSDB_SNAPSHOT

568
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
M
Minghao Li 已提交
569
#ifdef USE_TSDB_SNAPSHOT
570 571
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
572
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
573
  return code;
M
Minghao Li 已提交
574 575 576 577
#else
  *ppReader = taosMemoryMalloc(32);
  return 0;
#endif
578
}
S
Shengliang Guan 已提交
579

580
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
M
Minghao Li 已提交
581
#ifdef USE_TSDB_SNAPSHOT
582
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
583
  int32_t code = vnodeSnapReaderClose(pReader);
584
  return code;
M
Minghao Li 已提交
585 586 587 588
#else
  taosMemoryFree(pReader);
  return 0;
#endif
589
}
S
Shengliang Guan 已提交
590

591
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
M
Minghao Li 已提交
592
#ifdef USE_TSDB_SNAPSHOT
593
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
594
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
595
  return code;
M
Minghao Li 已提交
596 597 598 599 600 601 602 603 604 605 606 607
#else
  static int32_t times = 0;
  if (times++ < 5) {
    *len = 64;
    *ppBuf = taosMemoryMalloc(*len);
    snprintf(*ppBuf, *len, "snapshot block %d", times);
  } else {
    *len = 0;
    *ppBuf = NULL;
  }
  return 0;
#endif
608
}
S
Shengliang Guan 已提交
609

M
Minghao Li 已提交
610
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
611
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
612 613
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
614 615 616 617

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    if (itemSize == 0) {
S
Shengliang Guan 已提交
618
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
619 620
      break;
    } else {
S
Shengliang Guan 已提交
621
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId);
622 623 624 625
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
626 627
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
M
Minghao Li 已提交
628 629 630 631
#else
  *ppWriter = taosMemoryMalloc(32);
  return 0;
#endif
M
Minghao Li 已提交
632
}
S
Shengliang Guan 已提交
633

634
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
635
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
636
  SVnode *pVnode = pFsm->data;
637
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
638
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
639

640
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
641
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
642
  return code;
M
Minghao Li 已提交
643 644 645 646
#else
  taosMemoryFree(pWriter);
  return 0;
#endif
M
Minghao Li 已提交
647
}
S
Shengliang Guan 已提交
648

M
Minghao Li 已提交
649
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
650
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
651
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
652
  vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
653
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
S
Shengliang Guan 已提交
654
  vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
655
  return code;
M
Minghao Li 已提交
656 657 658
#else
  return 0;
#endif
M
Minghao Li 已提交
659
}
S
Shengliang Guan 已提交
660

661 662 663 664
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
  SVnode *pVnode = pFsm->data;
}

665 666 667 668 669 670
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
  pVnode->restored = true;
  vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
}

671
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
672
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
673
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
674 675 676
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
677
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
678
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
679
  pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
680
  pFsm->FpReConfigCb = vnodeSyncReconfig;
S
Shengliang Guan 已提交
681 682 683 684 685 686 687
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
688
  return pFsm;
689 690 691 692
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
693 694
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
      //.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT,
M
Minghao Li 已提交
695
      .batchSize = 1,
696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
      .vgId = pVnode->config.vgId,
      .isStandBy = pVnode->config.standby,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
      .msgcb = NULL,
      .FpSendMsg = vnodeSyncSendMsg,
      .FpEqMsg = vnodeSyncEqMsg,
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

M
Minghao Li 已提交
714
  setPingTimerMS(pVnode->sync, 5000);
715 716
  setElectTimerMS(pVnode->sync, 4000);
  setHeartbeatTimerMS(pVnode->sync, 700);
717 718 719 720 721
  return 0;
}

void vnodeSyncStart(SVnode *pVnode) {
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
M
Minghao Li 已提交
722
  syncStart(pVnode->sync);
723 724 725
}

void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
726 727 728 729 730 731

bool vnodeIsLeader(SVnode *pVnode) {
  if (!syncIsReady(pVnode->sync)) {
    return false;
  }

732 733 734 735
  if (!pVnode->restored) {
    terrno = TSDB_CODE_APP_NOT_READY;
    return false;
  }
736 737

  return true;
L
Liu Jicong 已提交
738
}