vnodeSync.c 29.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19 20
#define BATCH_DISABLE 1

21
static inline bool vnodeIsMsgBlock(tmsg_t type) {
S
Shengliang Guan 已提交
22 23
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_REPLICA);
24
}
M
Minghao Li 已提交
25

26
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
27

28 29
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
30
    const STraceId *trace = &pMsg->info.traceId;
31
    taosThreadMutexLock(&pVnode->lock);
32 33 34
    if (!pVnode->blocked) {
      vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
      pVnode->blocked = true;
35
      taosThreadMutexUnlock(&pVnode->lock);
36 37
      tsem_wait(&pVnode->syncSem);
    } else {
38
      taosThreadMutexUnlock(&pVnode->lock);
39
    }
40
  }
M
Minghao Li 已提交
41 42
}

43 44
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
45
    const STraceId *trace = &pMsg->info.traceId;
46
    taosThreadMutexLock(&pVnode->lock);
47
    if (pVnode->blocked) {
48
      vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
49
      pVnode->blocked = false;
50 51
      tsem_post(&pVnode->syncSem);
    }
52
    taosThreadMutexUnlock(&pVnode->lock);
53 54 55
  }
}

S
Shengliang Guan 已提交
56 57 58 59 60 61 62 63 64 65 66
static int32_t vnodeSetStandBy(SVnode *pVnode) {
  vInfo("vgId:%d, start to set standby", TD_VID(pVnode));

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
    vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
67
  vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode));
S
Shengliang Guan 已提交
68 69 70 71 72 73 74 75 76 77 78
  if (syncLeaderTransfer(pVnode->sync) != 0) {
    vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
    return -1;
  } else {
    vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
  }

  if (syncSetStandby(pVnode->sync) == 0) {
    vInfo("vgId:%d, set standby success", TD_VID(pVnode));
    return 0;
  } else {
79
    vError("vgId:%d, failed to set standby after leader transfer since %s", TD_VID(pVnode), terrstr());
S
Shengliang Guan 已提交
80 81 82 83
    return -1;
  }
}

S
Shengliang Guan 已提交
84
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
85 86 87
  SAlterVnodeReq req = {0};
  if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
88
    return TSDB_CODE_INVALID_MSG;
89
  }
90 91

  const STraceId *trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
92
  vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
93

94 95 96 97 98 99 100 101
  SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
  for (int32_t r = 0; r < req.replica; ++r) {
    SNodeInfo *pNode = &cfg.nodeInfo[r];
    tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = req.replicas[r].port;
    vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
  }

S
Shengliang Guan 已提交
102 103 104 105 106 107
  SRpcMsg rpcMsg = {.info = pMsg->info};
  if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) {
    vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
108 109
  int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
  if (code != 0) {
S
Shengliang Guan 已提交
110 111 112 113 114 115 116 117 118
    if (terrno != 0) code = terrno;

    vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
    if (terrno == TSDB_CODE_SYN_IS_LEADER) {
      if (syncLeaderTransfer(pVnode->sync) != 0) {
        vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
      } else {
        vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
      }
S
Shengliang Guan 已提交
119 120 121
    }
  }

S
Shengliang Guan 已提交
122
  terrno = code;
S
Shengliang Guan 已提交
123
  return code;
124 125
}

126 127 128 129 130 131 132 133 134 135 136 137 138
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

D
dapan1121 已提交
139
  SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1};
140 141 142
  tmsgSendRedirectRsp(&rsp, &newEpSet);
}

143 144 145 146 147 148 149 150 151
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
152 153 154 155
  } else {
    if (rsp.pCont) {
      rpcFreeCont(rsp.pCont);
    }
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
  if (code == TSDB_CODE_SYN_NOT_LEADER) {
    vnodeRedirectRpcMsg(pVnode, pMsg);
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
  int32_t code = vnodeProcessAlterReplicaReq(pVnode, pMsg);

  if (code > 0) {
    ASSERT(0);
  } else if (code == 0) {
    vnodeWaitBlockMsg(pVnode, pMsg);
  } else {
    if (terrno != 0) code = terrno;
    vnodeHandleProposeError(pVnode, pMsg, code);
  }

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
}

190
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
191 192 193
  if (*arrSize <= 0) return;

#if BATCH_DISABLE
194
  int32_t code = syncPropose(pVnode->sync, pMsgArr[0], pIsWeakArr[0]);
195 196 197 198 199 200
#else
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
#endif

  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
201
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
202 203
    }
  } else if (code == 0) {
204
    vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]);
205 206 207
  } else {
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
208
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
209 210 211 212
    }
  }

  for (int32_t i = 0; i < *arrSize; ++i) {
213 214 215
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
216
    rpcFreeCont(pMsg->pCont);
217
    taosFreeQitem(pMsg);
218 219 220 221 222
  }

  *arrSize = 0;
}

223
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
224 225 226 227 228 229 230
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
231 232
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

233
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
234
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
235 236 237
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

238
    const STraceId *trace = &pMsg->info.traceId;
239 240 241
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

242 243 244 245 246 247 248 249 250
    if (!pVnode->restored) {
      vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg);
      terrno = TSDB_CODE_APP_NOT_READY;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

251 252
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
      vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
253 254
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
255 256 257 258
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
259

260
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
261
    if (code != 0) {
262 263 264 265
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
266 267
    }

268 269 270 271 272 273 274 275 276
    if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
      vnodeHandleAlterReplicaReq(pVnode, pMsg);
      continue;
    }

    if (isBlock || BATCH_DISABLE) {
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

277
    pMsgArr[arrayPos] = pMsg;
278 279 280 281 282
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

    if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) {
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
283
    }
284
  }
285 286 287

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
288 289
}

290
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
291
  SVnode  *pVnode = pInfo->ahandle;
292 293 294 295 296 297
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
298
    const STraceId *trace = &pMsg->info.traceId;
299 300
    vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
            TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
301

S
Shengliang Guan 已提交
302 303
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
304
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
305
        rsp.code = terrno;
306 307
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
308 309
      }
    }
310

311
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
312
    if (rsp.info.handle != NULL) {
313
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
314 315 316 317
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
318 319
    }

320
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
321 322
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
323
  }
M
Minghao Li 已提交
324 325
}

326
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
327 328
  int32_t         code = 0;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
329

330 331 332 333 334
  if (!syncEnvIsStart()) {
    vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId);
    terrno = TSDB_CODE_APP_ERROR;
    return -1;
  }
S
Shengliang Guan 已提交
335

336 337 338 339 340 341
  SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
  if (pSyncNode == NULL) {
    vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId);
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
S
Shengliang Guan 已提交
342

343
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365

  if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
      syncClientRequestDestroy(pSyncMsg);
366 367 368 369 370
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
      SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
      syncClientRequestBatchDestroyDeep(pSyncMsg);
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
      code = vnodeSetStandBy(pVnode);
      if (code != 0 && terrno != 0) code = terrno;
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
396
    } else {
397 398
      vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
      code = -1;
S
Shengliang Guan 已提交
399 400
    }

401
  } else if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_WAL_FIRST) {
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
    // use wal first strategy
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
      syncClientRequestDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
      SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
M
Minghao Li 已提交
427
      syncClientRequestBatchDestroy(pSyncMsg);
428 429 430
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
431
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
432 433 434 435
      syncRequestVoteDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
436
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
437 438 439 440 441 442 443 444 445 446 447
      syncRequestVoteReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
      SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg);
      syncAppendEntriesBatchDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      ASSERT(pSyncMsg != NULL);
      code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
448 449 450 451 452 453 454 455
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
456 457 458 459 460 461 462 463 464
    } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
      code = vnodeSetStandBy(pVnode);
      if (code != 0 && terrno != 0) code = terrno;
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
    } else {
      vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
      code = -1;
    }
S
Shengliang Guan 已提交
465 466
  }

467 468
  vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
         code);
469 470
  syncNodeRelease(pSyncNode);
  if (code != 0 && terrno == 0) {
S
Shengliang Guan 已提交
471 472
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
473
  return code;
S
Shengliang Guan 已提交
474 475
}

476
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
M
Minghao Li 已提交
477 478 479
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
480
    pMsg->pCont = NULL;
M
Minghao Li 已提交
481 482 483
  }
  return code;
}
M
Minghao Li 已提交
484

485
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
486 487 488 489 490 491 492
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
493

494
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
495
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
496 497 498
  return 0;
}

499
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
500 501
  SVnode *pVnode = pFsm->data;

502
  SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
503
  syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info);
504
  rpcMsg.info.conn.applyIndex = cbMeta.index;
505

506
  const STraceId *trace = (STraceId *)&pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
507 508
  vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
          TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
S
Shengliang Guan 已提交
509 510 511 512
  if (rpcMsg.info.handle != NULL) {
    tmsgSendRsp(&rpcMsg);
  }

513
  vnodePostBlockMsg(pVnode, pMsg);
514 515
}

516
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
517 518 519 520 521 522 523 524 525 526
  if (cbMeta.isWeak == 0) {
    SVnode *pVnode = pFsm->data;

    if (cbMeta.code == 0) {
      SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
      rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
      memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
      syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
      rpcMsg.info.conn.applyIndex = cbMeta.index;
      rpcMsg.info.conn.applyTerm = cbMeta.term;
M
Minghao Li 已提交
527

M
Minghao Li 已提交
528
      vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
S
Shengliang Guan 已提交
529
            ", weak:%d, code:%d, state:%d %s, type:%s",
M
Minghao Li 已提交
530
            syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak,
S
Shengliang Guan 已提交
531
            cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
532

533 534 535
      tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
    } else {
      SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
S
Shengliang Guan 已提交
536 537
      vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync),
             TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code));
538 539 540
      if (rsp.info.handle != NULL) {
        tmsgSendRsp(&rsp);
      }
M
Minghao Li 已提交
541 542
    }
  }
M
Minghao Li 已提交
543 544
}

545
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
546 547
  if (cbMeta.isWeak == 1) {
    SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
548
    vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
549
           syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
S
Shengliang Guan 已提交
550
           syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
551 552 553 554 555 556 557 558 559 560 561

    if (cbMeta.code == 0) {
      SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
      rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
      memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
      syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
      rpcMsg.info.conn.applyIndex = cbMeta.index;
      rpcMsg.info.conn.applyTerm = cbMeta.term;
      tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
    } else {
      SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
S
Shengliang Guan 已提交
562 563
      vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync),
             TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
564 565 566
      if (rsp.info.handle != NULL) {
        tmsgSendRsp(&rsp);
      }
567 568
    }
  }
M
Minghao Li 已提交
569 570
}

571
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
572
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
573
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
574
         syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
S
Shengliang Guan 已提交
575
         syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
576 577
}

M
Minghao Li 已提交
578 579
#define USE_TSDB_SNAPSHOT

580
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
M
Minghao Li 已提交
581
#ifdef USE_TSDB_SNAPSHOT
582 583
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
584
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
585
  return code;
M
Minghao Li 已提交
586 587 588 589
#else
  *ppReader = taosMemoryMalloc(32);
  return 0;
#endif
590
}
S
Shengliang Guan 已提交
591

592
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
M
Minghao Li 已提交
593
#ifdef USE_TSDB_SNAPSHOT
594
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
595
  int32_t code = vnodeSnapReaderClose(pReader);
596
  return code;
M
Minghao Li 已提交
597 598 599 600
#else
  taosMemoryFree(pReader);
  return 0;
#endif
601
}
S
Shengliang Guan 已提交
602

603
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
M
Minghao Li 已提交
604
#ifdef USE_TSDB_SNAPSHOT
605
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
606
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
607
  return code;
M
Minghao Li 已提交
608 609 610 611 612 613 614 615 616 617 618 619
#else
  static int32_t times = 0;
  if (times++ < 5) {
    *len = 64;
    *ppBuf = taosMemoryMalloc(*len);
    snprintf(*ppBuf, *len, "snapshot block %d", times);
  } else {
    *len = 0;
    *ppBuf = NULL;
  }
  return 0;
#endif
620
}
S
Shengliang Guan 已提交
621

M
Minghao Li 已提交
622
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
623
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
624 625
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
626 627 628 629

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    if (itemSize == 0) {
S
Shengliang Guan 已提交
630
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
631 632
      break;
    } else {
S
Shengliang Guan 已提交
633
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId);
634 635 636 637
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
638 639
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
M
Minghao Li 已提交
640 641 642 643
#else
  *ppWriter = taosMemoryMalloc(32);
  return 0;
#endif
M
Minghao Li 已提交
644
}
S
Shengliang Guan 已提交
645

646
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
647
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
648
  SVnode *pVnode = pFsm->data;
649
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
650
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
651

652
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
653
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
654
  return code;
M
Minghao Li 已提交
655 656 657 658
#else
  taosMemoryFree(pWriter);
  return 0;
#endif
M
Minghao Li 已提交
659
}
S
Shengliang Guan 已提交
660

M
Minghao Li 已提交
661
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
662
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
663
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
664
  vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
665
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
S
Shengliang Guan 已提交
666
  vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
667
  return code;
M
Minghao Li 已提交
668 669 670
#else
  return 0;
#endif
M
Minghao Li 已提交
671
}
S
Shengliang Guan 已提交
672

673 674 675 676
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
  SVnode *pVnode = pFsm->data;
}

677 678
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
679 680 681 682 683 684 685 686 687 688 689 690

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    if (itemSize == 0) {
      vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId);
      break;
    } else {
      vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId);
      taosMsleep(10);
    }
  } while (true);

691 692 693 694
  pVnode->restored = true;
  vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
}

695 696 697 698 699
static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become follower", pVnode->config.vgId);

  // clear old leader resource
700
  taosThreadMutexLock(&pVnode->lock);
701 702
  if (pVnode->blocked) {
    pVnode->blocked = false;
703
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
704 705
    tsem_post(&pVnode->syncSem);
  }
706
  taosThreadMutexUnlock(&pVnode->lock);
707 708 709 710 711
}

static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
712

713 714 715 716 717 718
  // taosThreadMutexLock(&pVnode->lock);
  // if (pVnode->blocked) {
  //   pVnode->blocked = false;
  //   tsem_post(&pVnode->syncSem);
  // }
  // taosThreadMutexUnlock(&pVnode->lock);
719 720
}

721
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
722
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
723
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
724 725 726
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
727
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
728
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
729
  pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
730 731
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
732
  pFsm->FpReConfigCb = vnodeSyncReconfig;
S
Shengliang Guan 已提交
733 734 735 736 737 738 739
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
740
  return pFsm;
741 742 743 744
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
745 746
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
      //.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT,
M
Minghao Li 已提交
747
      .batchSize = 1,
748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765
      .vgId = pVnode->config.vgId,
      .isStandBy = pVnode->config.standby,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
      .msgcb = NULL,
      .FpSendMsg = vnodeSyncSendMsg,
      .FpEqMsg = vnodeSyncEqMsg,
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

M
Minghao Li 已提交
766
  setPingTimerMS(pVnode->sync, 5000);
767 768
  setElectTimerMS(pVnode->sync, 4000);
  setHeartbeatTimerMS(pVnode->sync, 700);
769 770 771 772 773
  return 0;
}

void vnodeSyncStart(SVnode *pVnode) {
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
M
Minghao Li 已提交
774
  syncStart(pVnode->sync);
775 776 777
}

void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
778

779 780
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }

781 782
bool vnodeIsLeader(SVnode *pVnode) {
  if (!syncIsReady(pVnode->sync)) {
M
Minghao Li 已提交
783 784
    vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),
           syncRestoreFinish(pVnode->sync));
785 786 787
    return false;
  }

788
  if (!pVnode->restored) {
789
    vDebug("vgId:%d, vnode not restored", pVnode->config.vgId);
790 791 792
    terrno = TSDB_CODE_APP_NOT_READY;
    return false;
  }
793 794

  return true;
L
Liu Jicong 已提交
795
}
796 797 798 799 800 801 802 803 804 805 806 807 808 809

bool vnodeIsReadyForRead(SVnode *pVnode) {
  if (syncIsReady(pVnode->sync)) {
    return true;
  }

  if (syncIsReadyForRead(pVnode->sync)) {
    return true;
  }

  vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId,
         syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync));
  return false;
}