vnodeSync.c 24.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19 20
#define BATCH_DISABLE 1

21
static inline bool vnodeIsMsgBlock(tmsg_t type) {
S
Shengliang Guan 已提交
22
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
S
Shengliang Guan 已提交
23
         (type == TDMT_VND_UPDATE_TAG_VAL);
24
}
M
Minghao Li 已提交
25

26
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
27

28 29
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
30
    const STraceId *trace = &pMsg->info.traceId;
31
    taosThreadMutexLock(&pVnode->lock);
32 33 34
    if (!pVnode->blocked) {
      vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
      pVnode->blocked = true;
35
      taosThreadMutexUnlock(&pVnode->lock);
36 37
      tsem_wait(&pVnode->syncSem);
    } else {
38
      taosThreadMutexUnlock(&pVnode->lock);
39
    }
40
  }
M
Minghao Li 已提交
41 42
}

43 44
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
45
    const STraceId *trace = &pMsg->info.traceId;
46
    taosThreadMutexLock(&pVnode->lock);
47
    if (pVnode->blocked) {
48
      vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
49
      pVnode->blocked = false;
50 51
      tsem_post(&pVnode->syncSem);
    }
52
    taosThreadMutexUnlock(&pVnode->lock);
53 54 55
  }
}

56 57 58 59 60 61 62 63 64 65 66 67 68
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

D
dapan1121 已提交
69
  SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1};
70 71 72
  tmsgSendRedirectRsp(&rsp, &newEpSet);
}

73 74 75 76 77 78 79 80 81
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
82 83 84 85
  } else {
    if (rsp.pCont) {
      rpcFreeCont(rsp.pCont);
    }
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
  if (code == TSDB_CODE_SYN_NOT_LEADER) {
    vnodeRedirectRpcMsg(pVnode, pMsg);
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

102
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
103 104 105
  if (*arrSize <= 0) return;

#if BATCH_DISABLE
106
  int32_t code = syncPropose(pVnode->sync, pMsgArr[0], pIsWeakArr[0]);
107 108 109 110 111 112
#else
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
#endif

  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
113
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
114 115
    }
  } else if (code == 0) {
116
    vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]);
117 118 119
  } else {
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
120
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
121 122 123 124
    }
  }

  for (int32_t i = 0; i < *arrSize; ++i) {
125 126 127
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
128
    rpcFreeCont(pMsg->pCont);
129
    taosFreeQitem(pMsg);
130 131 132 133 134
  }

  *arrSize = 0;
}

135
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
136 137 138 139 140 141 142
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
143 144
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

145
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
146
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
147 148 149
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

150
    const STraceId *trace = &pMsg->info.traceId;
151 152 153
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

154
    if (!pVnode->restored) {
155
      vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
156 157 158 159 160 161 162
      terrno = TSDB_CODE_APP_NOT_READY;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

163 164
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
      vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
165 166
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
167 168 169 170
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
171

172
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
173
    if (code != 0) {
174 175 176 177
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
178 179
    }

180 181 182 183
    if (isBlock || BATCH_DISABLE) {
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

184
    pMsgArr[arrayPos] = pMsg;
185 186 187 188 189
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

    if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) {
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
190
    }
191
  }
192 193 194

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
195 196
}

197
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
198
  SVnode  *pVnode = pInfo->ahandle;
199 200 201 202 203 204
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
205
    const STraceId *trace = &pMsg->info.traceId;
206 207
    vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
            TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
208

S
Shengliang Guan 已提交
209 210
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
211
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
212
        rsp.code = terrno;
213 214
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
215 216
      }
    }
217

218
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
219
    if (rsp.info.handle != NULL) {
220
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
221 222 223 224
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
225 226
    }

227
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
228 229
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
230
  }
M
Minghao Li 已提交
231 232
}

233 234 235 236 237
int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
  int32_t         code = 0;
  const STraceId *trace = &pMsg->info.traceId;

  if (!syncEnvIsStart()) {
238
    vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
239 240 241 242 243 244
    terrno = TSDB_CODE_APP_ERROR;
    return -1;
  }

  SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
  if (pSyncNode == NULL) {
245
    vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId, pMsg);
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
    SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
    syncHeartbeatDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
    SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
    syncHeartbeatReplyDestroy(pSyncMsg);

  } else {
263
    vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
264 265 266 267 268 269 270 271 272 273 274 275
    code = -1;
  }

  vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
         code);
  syncNodeRelease(pSyncNode);
  if (code != 0 && terrno == 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
  return code;
}

276
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
277 278
  int32_t         code = 0;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
279

280
  if (!syncEnvIsStart()) {
S
Shengliang Guan 已提交
281
    vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
282 283 284
    terrno = TSDB_CODE_APP_ERROR;
    return -1;
  }
S
Shengliang Guan 已提交
285

286 287
  SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
  if (pSyncNode == NULL) {
S
Shengliang Guan 已提交
288
    vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId, pMsg);
289 290 291
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }
S
Shengliang Guan 已提交
292

293
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
294

M
Minghao Li 已提交
295 296 297
  if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
    SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
    ASSERT(pSyncMsg != NULL);
298
    code = syncNodeOnTimer(pSyncNode, pSyncMsg);
M
Minghao Li 已提交
299
    syncTimeoutDestroy(pSyncMsg);
M
Minghao Li 已提交
300

M
Minghao Li 已提交
301 302 303
  } else if (pMsg->msgType == TDMT_SYNC_PING) {
    SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
    ASSERT(pSyncMsg != NULL);
M
Minghao Li 已提交
304
    code = syncNodeOnPing(pSyncNode, pSyncMsg);
M
Minghao Li 已提交
305
    syncPingDestroy(pSyncMsg);
M
Minghao Li 已提交
306

M
Minghao Li 已提交
307 308 309
  } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
    SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
    ASSERT(pSyncMsg != NULL);
M
Minghao Li 已提交
310
    code = syncNodeOnPingReply(pSyncNode, pSyncMsg);
M
Minghao Li 已提交
311 312 313 314 315
    syncPingReplyDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
    ASSERT(pSyncMsg != NULL);
M
Minghao Li 已提交
316
    code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
M
Minghao Li 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
    syncClientRequestDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
    SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
    ASSERT(pSyncMsg != NULL);
    code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
    syncRequestVoteDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
    SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
    ASSERT(pSyncMsg != NULL);
    code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
    syncRequestVoteReplyDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
    SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
    ASSERT(pSyncMsg != NULL);
    code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
    syncAppendEntriesDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    ASSERT(pSyncMsg != NULL);
    code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
342 343 344 345 346

  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
    SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
    syncSnapshotSendDestroy(pSyncMsg);
M
Minghao Li 已提交
347 348

  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
M
Minghao Li 已提交
349 350 351
    SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
    syncSnapshotRspDestroy(pSyncMsg);
M
Minghao Li 已提交
352 353

  } else {
M
Minghao Li 已提交
354 355
    vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
    code = -1;
S
Shengliang Guan 已提交
356 357
  }

358 359
  vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
         code);
360 361
  syncNodeRelease(pSyncNode);
  if (code != 0 && terrno == 0) {
S
Shengliang Guan 已提交
362 363
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
364
  return code;
S
Shengliang Guan 已提交
365 366
}

367
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
368 369 370 371 372 373 374
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
375 376 377 378 379 380 381 382 383 384 385
    return -1;
  }

  int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}

386
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
387 388 389 390 391 392 393
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
M
Minghao Li 已提交
394 395 396
    return -1;
  }

M
Minghao Li 已提交
397 398 399
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
400
    pMsg->pCont = NULL;
M
Minghao Li 已提交
401 402 403
  }
  return code;
}
M
Minghao Li 已提交
404

405
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
406 407 408 409 410 411 412
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
413

414
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
415
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
416 417 418
  return 0;
}

S
Shengliang Guan 已提交
419
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {}
420

421
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
422 423 424 425 426 427 428 429 430 431
  if (cbMeta.isWeak == 0) {
    SVnode *pVnode = pFsm->data;

    if (cbMeta.code == 0) {
      SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
      rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
      memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
      syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
      rpcMsg.info.conn.applyIndex = cbMeta.index;
      rpcMsg.info.conn.applyTerm = cbMeta.term;
M
Minghao Li 已提交
432

M
Minghao Li 已提交
433
      vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
S
Shengliang Guan 已提交
434
            ", weak:%d, code:%d, state:%d %s, type:%s",
M
Minghao Li 已提交
435
            syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak,
S
Shengliang Guan 已提交
436
            cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
437

438 439 440
      tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
    } else {
      SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
S
Shengliang Guan 已提交
441 442
      vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync),
             TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code));
443 444 445
      if (rsp.info.handle != NULL) {
        tmsgSendRsp(&rsp);
      }
M
Minghao Li 已提交
446 447
    }
  }
M
Minghao Li 已提交
448 449
}

450
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
451 452
  if (cbMeta.isWeak == 1) {
    SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
453
    vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
454
           syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
S
Shengliang Guan 已提交
455
           syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
456 457 458 459 460 461 462 463 464 465 466

    if (cbMeta.code == 0) {
      SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
      rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
      memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
      syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
      rpcMsg.info.conn.applyIndex = cbMeta.index;
      rpcMsg.info.conn.applyTerm = cbMeta.term;
      tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
    } else {
      SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
S
Shengliang Guan 已提交
467 468
      vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync),
             TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
469 470 471
      if (rsp.info.handle != NULL) {
        tmsgSendRsp(&rsp);
      }
472 473
    }
  }
M
Minghao Li 已提交
474 475
}

476
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
477
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
478
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
479
         syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
S
Shengliang Guan 已提交
480
         syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
481 482
}

M
Minghao Li 已提交
483 484
#define USE_TSDB_SNAPSHOT

485
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
M
Minghao Li 已提交
486
#ifdef USE_TSDB_SNAPSHOT
487 488
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
489
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
490
  return code;
M
Minghao Li 已提交
491 492 493 494
#else
  *ppReader = taosMemoryMalloc(32);
  return 0;
#endif
495
}
S
Shengliang Guan 已提交
496

497
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
M
Minghao Li 已提交
498
#ifdef USE_TSDB_SNAPSHOT
499
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
500
  int32_t code = vnodeSnapReaderClose(pReader);
501
  return code;
M
Minghao Li 已提交
502 503 504 505
#else
  taosMemoryFree(pReader);
  return 0;
#endif
506
}
S
Shengliang Guan 已提交
507

508
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
M
Minghao Li 已提交
509
#ifdef USE_TSDB_SNAPSHOT
510
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
511
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
512
  return code;
M
Minghao Li 已提交
513 514 515 516 517 518 519 520 521 522 523 524
#else
  static int32_t times = 0;
  if (times++ < 5) {
    *len = 64;
    *ppBuf = taosMemoryMalloc(*len);
    snprintf(*ppBuf, *len, "snapshot block %d", times);
  } else {
    *len = 0;
    *ppBuf = NULL;
  }
  return 0;
#endif
525
}
S
Shengliang Guan 已提交
526

M
Minghao Li 已提交
527
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
528
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
529 530
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
531 532 533

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
534
    if (itemSize == 0) {
S
Shengliang Guan 已提交
535
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
536 537
      break;
    } else {
S
Shengliang Guan 已提交
538
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
539 540 541 542
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
543 544
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
M
Minghao Li 已提交
545 546 547 548
#else
  *ppWriter = taosMemoryMalloc(32);
  return 0;
#endif
M
Minghao Li 已提交
549
}
S
Shengliang Guan 已提交
550

551
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
552
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
553
  SVnode *pVnode = pFsm->data;
554
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
555
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
556

557
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
558
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
559
  return code;
M
Minghao Li 已提交
560 561 562 563
#else
  taosMemoryFree(pWriter);
  return 0;
#endif
M
Minghao Li 已提交
564
}
S
Shengliang Guan 已提交
565

M
Minghao Li 已提交
566
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
567
#ifdef USE_TSDB_SNAPSHOT
M
Minghao Li 已提交
568
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
569
  vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
570
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
S
Shengliang Guan 已提交
571
  vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
572
  return code;
M
Minghao Li 已提交
573 574 575
#else
  return 0;
#endif
M
Minghao Li 已提交
576
}
S
Shengliang Guan 已提交
577

S
Shengliang Guan 已提交
578
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {}
579

580 581
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
582 583 584

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
585
    if (itemSize == 0) {
586 587 588
      vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId);
      break;
    } else {
S
Shengliang Guan 已提交
589
      vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize);
590 591 592 593
      taosMsleep(10);
    }
  } while (true);

594 595
  walApplyVer(pVnode->pWal, pVnode->state.applied);

596 597 598 599
  pVnode->restored = true;
  vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
}

600 601 602 603 604
static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become follower", pVnode->config.vgId);

  // clear old leader resource
605
  taosThreadMutexLock(&pVnode->lock);
606 607
  if (pVnode->blocked) {
    pVnode->blocked = false;
608
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
609 610
    tsem_post(&pVnode->syncSem);
  }
611
  taosThreadMutexUnlock(&pVnode->lock);
612 613 614 615 616
}

static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
617

S
Shengliang Guan 已提交
618 619 620 621 622 623 624 625
#if 0
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    pVnode->blocked = false;
    tsem_post(&pVnode->syncSem);
  }
  taosThreadMutexUnlock(&pVnode->lock);
#endif
626 627
}

628
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
629
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
630
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
631 632 633
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
634
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
635
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
636
  pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
637 638
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
639
  pFsm->FpReConfigCb = vnodeSyncReconfig;
S
Shengliang Guan 已提交
640 641 642 643 644 645 646
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
647
  return pFsm;
648 649 650 651
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
652
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
M
Minghao Li 已提交
653
      .batchSize = 1,
654 655 656 657 658 659
      .vgId = pVnode->config.vgId,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
      .msgcb = NULL,
      .FpSendMsg = vnodeSyncSendMsg,
      .FpEqMsg = vnodeSyncEqMsg,
660
      .FpEqCtrlMsg = vnodeSyncEqCtrlMsg,
661 662 663 664 665
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

S
Shengliang Guan 已提交
666 667 668 669 670 671 672
  SSyncCfg *pCfg = &syncInfo.syncCfg;
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
    vInfo("vgId:%d, index:%d ep:%s:%u", pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

673 674 675 676 677 678
  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

M
Minghao Li 已提交
679
  setPingTimerMS(pVnode->sync, 5000);
680 681
  setElectTimerMS(pVnode->sync, 4000);
  setHeartbeatTimerMS(pVnode->sync, 700);
682 683 684 685
  return 0;
}

void vnodeSyncStart(SVnode *pVnode) {
S
Shengliang Guan 已提交
686
  vDebug("vgId:%d, start sync", pVnode->config.vgId);
687
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
M
Minghao Li 已提交
688
  syncStart(pVnode->sync);
689 690
}

S
Shengliang Guan 已提交
691 692 693 694
void vnodeSyncClose(SVnode *pVnode) {
  vDebug("vgId:%d, close sync", pVnode->config.vgId);
  syncStop(pVnode->sync);
}
695

696 697
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }

698 699
bool vnodeIsLeader(SVnode *pVnode) {
  if (!syncIsReady(pVnode->sync)) {
M
Minghao Li 已提交
700 701
    vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),
           syncRestoreFinish(pVnode->sync));
702 703 704
    return false;
  }

705
  if (!pVnode->restored) {
706
    vDebug("vgId:%d, vnode not restored", pVnode->config.vgId);
707 708 709
    terrno = TSDB_CODE_APP_NOT_READY;
    return false;
  }
710 711

  return true;
L
Liu Jicong 已提交
712
}