vnodeSync.c 24.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19
#define BATCH_ENABLE 0
20

21
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
22

23
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
24
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
25 26
  vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
27 28 29
  tsem_wait(&pVnode->syncSem);
}

30 31
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
32
    const STraceId *trace = &pMsg->info.traceId;
33
    taosThreadMutexLock(&pVnode->lock);
34
    if (pVnode->blocked) {
35 36
      vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
37
      pVnode->blocked = false;
38 39
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
40 41
      tsem_post(&pVnode->syncSem);
    }
42
    taosThreadMutexUnlock(&pVnode->lock);
43 44 45
  }
}

46
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
47 48 49 50 51 52 53 54 55 56 57 58
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

59 60 61
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;

  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
62 63 64 65 66 67 68 69 70 71 72
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);

  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(rsp.pCont, contLen, &newEpSet);
    rsp.contLen = contLen;
  }

  tmsgSendRsp(&rsp);
73 74
}

75 76 77 78 79 80 81 82 83
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
84 85 86 87
  } else {
    if (rsp.pCont) {
      rpcFreeCont(rsp.pCont);
    }
88 89 90 91
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
92
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
93
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
94 95 96 97 98 99 100 101 102 103
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

B
Benguang Zhao 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
  int64_t seq = 0;

  taosThreadMutexLock(&pVnode->lock);
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
  if (wait) {
    ASSERT(!pVnode->blocked);
    pVnode->blocked = true;
    pVnode->blockSec = taosGetTimestampSec();
    pVnode->blockSeq = seq;
#if 0
    pVnode->blockInfo = pMsg->info;
#endif
  }
  taosThreadMutexUnlock(&pVnode->lock);

  if (code > 0) {
    vnodeHandleWriteMsg(pVnode, pMsg);
  } else if (code < 0) {
    if (terrno != 0) code = terrno;
    vnodeHandleProposeError(pVnode, pMsg, code);
  }

  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
  return code;
}

void vnodeProposeCommitOnNeed(SVnode *pVnode) {
  if (!vnodeShouldCommit(pVnode)) {
    return;
  }

  int32_t   contLen = sizeof(SMsgHead);
  SMsgHead *pHead = rpcMallocCont(contLen);
  pHead->contLen = contLen;
  pHead->vgId = pVnode->config.vgId;

  SRpcMsg rpcMsg = {0};
  rpcMsg.msgType = TDMT_VND_COMMIT;
  rpcMsg.contLen = contLen;
  rpcMsg.pCont = pHead;
  rpcMsg.info.noResp = 1;

  bool isWeak = false;
  if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
    vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
    goto _out;
  }

  vInfo("vgId:%d, proposed vnode commit", pVnode->config.vgId);

_out:
157
  vnodeUpdCommitSched(pVnode);
B
Benguang Zhao 已提交
158 159 160 161
  rpcFreeCont(rpcMsg.pCont);
  rpcMsg.pCont = NULL;
}

162 163
#if BATCH_ENABLE

164
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
165
  if (*arrSize <= 0) return;
166
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
167

168
  taosThreadMutexLock(&pVnode->lock);
169
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
170 171
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
  if (wait) {
172
    ASSERT(!pVnode->blocked);
173 174 175 176
    pVnode->blocked = true;
  }
  taosThreadMutexUnlock(&pVnode->lock);

177 178
  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
179
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
180
    }
181
  } else if (code < 0) {
182 183
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
184
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
185 186 187
    }
  }

188 189 190
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
  pLastMsg = NULL;

191
  for (int32_t i = 0; i < *arrSize; ++i) {
192 193 194
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
195
    rpcFreeCont(pMsg->pCont);
196
    taosFreeQitem(pMsg);
197 198 199 200 201
  }

  *arrSize = 0;
}

202
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
203 204 205 206 207 208 209
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
210 211
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

212
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
213
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
214 215 216
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

217
    const STraceId *trace = &pMsg->info.traceId;
218 219 220
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

221
    if (!pVnode->restored) {
222
      vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
223 224
      terrno = TSDB_CODE_SYN_RESTORING;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
225 226 227 228 229
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

230
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
231
      vGError("vgId:%d, msg:%p failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
232 233
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
234 235 236 237
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
238

B
Benguang Zhao 已提交
239 240
    vnodeProposeCommitOnNeed(pVnode);

241
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
242
    if (code != 0) {
243 244 245 246
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
247 248
    }

249
    if (isBlock) {
250 251 252
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

253
    pMsgArr[arrayPos] = pMsg;
254 255 256
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

257
    if (isBlock || msg == numOfMsgs - 1) {
258
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
259
    }
260
  }
261 262 263

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
264 265
}

266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
#else

void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnode  *pVnode = pInfo->ahandle;
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);

    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);

    if (!pVnode->restored) {
284
      vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
285 286 287 288 289 290
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

B
Benguang Zhao 已提交
291 292
    vnodeProposeCommitOnNeed(pVnode);

293 294
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
    if (code != 0) {
295
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, tstrerror(code));
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
      if (terrno != 0) code = terrno;
      vnodeHandleProposeError(pVnode, pMsg, code);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = vnodeProposeMsg(pVnode, pMsg, isWeak);

    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
}

#endif

313
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
314
  SVnode  *pVnode = pInfo->ahandle;
315 316 317 318 319 320
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
321
    const STraceId *trace = &pMsg->info.traceId;
322

323
    if (vnodeIsMsgBlock(pMsg->msgType)) {
324 325 326 327 328 329 330
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
              ", blocking msg obtained sec:%d seq:%" PRId64,
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
              pVnode->blockSeq);
    } else {
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
331 332
    }

S
Shengliang Guan 已提交
333 334
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
335
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
336
        rsp.code = terrno;
337 338
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
339 340
      }
    }
341

342
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
343
    if (rsp.info.handle != NULL) {
344
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
345 346 347 348
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
349 350
    }

351
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
352 353
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
354
  }
M
Minghao Li 已提交
355 356
}

357
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
358 359 360
  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));

S
Shengliang Guan 已提交
361 362 363
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
  if (code != 0) {
    vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
S
Shengliang Guan 已提交
364
            TMSG_INFO(pMsg->msgType), terrstr());
365 366 367 368 369
  }

  return code;
}

370
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
371
  if (pMsg == NULL || pMsg->pCont == NULL) {
372 373
    return -1;
  }
S
Shengliang Guan 已提交
374

375 376 377
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
378 379 380 381 382 383 384 385 386 387 388
    return -1;
  }

  int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}

389
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
390 391 392 393 394 395 396
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
M
Minghao Li 已提交
397 398 399
    return -1;
  }

M
Minghao Li 已提交
400 401 402
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
403
    pMsg->pCont = NULL;
M
Minghao Li 已提交
404 405 406
  }
  return code;
}
M
Minghao Li 已提交
407

408
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
409 410 411 412 413 414 415
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
416

417
static void vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
418
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
419 420
}

421
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
422
  SVnode *pVnode = pFsm->data;
423 424
  pMsg->info.conn.applyIndex = pMeta->index;
  pMsg->info.conn.applyTerm = pMeta->term;
M
Minghao Li 已提交
425

426
  const STraceId *trace = &pMsg->info.traceId;
427
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
428
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
429
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
430
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
M
Minghao Li 已提交
431

432 433 434 435 436 437 438 439 440 441 442 443
  if (pMsg->code == 0) {
    return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
  }

  vnodePostBlockMsg(pVnode, pMsg);
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
  }
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
  return 0;
M
Minghao Li 已提交
444 445
}

446 447
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
  return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
M
Minghao Li 已提交
448 449
}

450
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
451
  if (pMeta->isWeak == 1) {
452
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
453
  }
454
  return 0;
M
Minghao Li 已提交
455 456
}

457 458 459 460 461
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
  SVnode *pVnode = pFSM->data;
  return atomic_load_64(&pVnode->state.applied);
}

462
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
463
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
464
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
465 466
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
         TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
467 468
}

S
Shengliang Guan 已提交
469
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
470 471
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
472
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
473 474
  return code;
}
S
Shengliang Guan 已提交
475

476
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
477
  SVnode *pVnode = pFsm->data;
478
  vnodeSnapReaderClose(pReader);
479
}
S
Shengliang Guan 已提交
480

S
Shengliang Guan 已提交
481
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
482
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
483
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
484 485
  return code;
}
S
Shengliang Guan 已提交
486

S
Shengliang Guan 已提交
487
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
488 489
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
490 491 492

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
493
    if (itemSize == 0) {
S
Shengliang Guan 已提交
494
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
495 496
      break;
    } else {
S
Shengliang Guan 已提交
497
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
498 499 500 501
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
502 503 504
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
}
S
Shengliang Guan 已提交
505

S
Shengliang Guan 已提交
506
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
507
  SVnode *pVnode = pFsm->data;
508
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
509
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
510

511
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
512
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
513 514
  return code;
}
S
Shengliang Guan 已提交
515

S
Shengliang Guan 已提交
516
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
517
  SVnode *pVnode = pFsm->data;
518
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
519
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
520
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
521 522
  return code;
}
S
Shengliang Guan 已提交
523

524
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
525
  SVnode *pVnode = pFsm->data;
526
  SyncIndex appliedIdx = -1;
527 528

  do {
529 530 531 532
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
    ASSERT(appliedIdx <= commitIdx);
    if (appliedIdx == commitIdx) {
      vInfo("vgId:%d, no more items to be applied, restore finish", pVnode->config.vgId);
533 534
      break;
    } else {
535 536 537 538
      int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
      vInfo("vgId:%d, restore not finish since %" PRId64
            " items to be applied, and %d in apply queue. commit-index:%" PRId64 ", applied-index:%" PRId64,
            pVnode->config.vgId, commitIdx - appliedIdx, itemSize, commitIdx, appliedIdx);
539 540 541 542
      taosMsleep(10);
    }
  } while (true);

543 544
  ASSERT(appliedIdx == commitIdx);
  walApplyVer(pVnode->pWal, commitIdx);
545

546
  pVnode->restored = true;
547
  vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
548 549
}

S
Shengliang Guan 已提交
550
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
551
  SVnode *pVnode = pFsm->data;
M
Minghao Li 已提交
552
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
553

554
  taosThreadMutexLock(&pVnode->lock);
555 556
  if (pVnode->blocked) {
    pVnode->blocked = false;
557
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
558 559
    tsem_post(&pVnode->syncSem);
  }
560
  taosThreadMutexUnlock(&pVnode->lock);
561 562
}

S
Shengliang Guan 已提交
563
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
564 565 566
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
}
567

568 569
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return (itemSize == 0);
  } else {
    return true;
  }
}

static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return itemSize;
  } else {
    return -1;
  }
588 589
}

590
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
591
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
592
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
593
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
594
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
S
Shengliang Guan 已提交
595 596
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
597
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
598
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
S
Shengliang Guan 已提交
599
  pFsm->FpLeaderTransferCb = NULL;
600
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
601
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
602 603
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
S
Shengliang Guan 已提交
604
  pFsm->FpReConfigCb = NULL;
S
Shengliang Guan 已提交
605 606 607 608 609 610 611
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
612
  return pFsm;
613 614 615 616
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
617
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
M
Minghao Li 已提交
618
      .batchSize = 1,
619 620 621
      .vgId = pVnode->config.vgId,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
S
Shengliang Guan 已提交
622
      .msgcb = &pVnode->msgCb,
S
Shengliang Guan 已提交
623 624 625 626 627 628
      .syncSendMSg = vnodeSyncSendMsg,
      .syncEqMsg = vnodeSyncEqMsg,
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
      .pingMs = 5000,
      .electMs = 4000,
      .heartbeatMs = 700,
629 630 631 632 633
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

S
Shengliang Guan 已提交
634 635 636 637
  SSyncCfg *pCfg = &syncInfo.syncCfg;
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
638 639
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
640 641
  }

642 643 644 645 646 647 648 649 650
  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

  return 0;
}

B
Benguang Zhao 已提交
651
int32_t vnodeSyncStart(SVnode *pVnode) {
S
Shengliang Guan 已提交
652
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
B
Benguang Zhao 已提交
653 654 655 656 657
  if (syncStart(pVnode->sync) < 0) {
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr());
    return -1;
  }
  return 0;
658 659
}

S
Shengliang Guan 已提交
660
void vnodeSyncPreClose(SVnode *pVnode) {
661
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
S
Shengliang Guan 已提交
662 663
  syncLeaderTransfer(pVnode->sync);
  syncPreStop(pVnode->sync);
664

S
Shengliang Guan 已提交
665 666 667 668 669 670 671 672 673
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
    pVnode->blocked = false;
    tsem_post(&pVnode->syncSem);
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

674
void vnodeSyncPostClose(SVnode *pVnode) {
675
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
676 677 678
  syncPostStop(pVnode->sync);
}

S
Shengliang Guan 已提交
679
void vnodeSyncClose(SVnode *pVnode) {
S
Shengliang Guan 已提交
680
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
S
Shengliang Guan 已提交
681 682
  syncStop(pVnode->sync);
}
683

S
Shengliang Guan 已提交
684 685 686 687 688 689 690 691 692
void vnodeSyncCheckTimeout(SVnode *pVnode) {
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    int32_t curSec = taosGetTimestampSec();
    int32_t delta = curSec - pVnode->blockSec;
    if (delta > VNODE_TIMEOUT_SEC) {
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
693
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
694
#if 0  
695
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
696
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
697 698
              rpcMsg.info.handle, rpcMsg.info.ahandle);
        rpcSendResponse(&rpcMsg);
699
#endif
700
      }
S
Shengliang Guan 已提交
701 702 703 704 705 706 707 708 709
      pVnode->blocked = false;
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
      tsem_post(&pVnode->syncSem);
    }
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

710 711 712 713
bool vnodeIsRoleLeader(SVnode *pVnode) {
  SSyncState state = syncGetState(pVnode->sync);
  return state.state == TAOS_SYNC_STATE_LEADER;
}
714

715
bool vnodeIsLeader(SVnode *pVnode) {
716
  terrno = 0;
717 718
  SSyncState state = syncGetState(pVnode->sync);

719 720 721 722 723 724 725 726
  if (terrno != 0) {
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
    return false;
  }

  if (state.state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
727 728 729
    return false;
  }

730 731 732
  if (!state.restored || !pVnode->restored) {
    terrno = TSDB_CODE_SYN_RESTORING;
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
733 734
    return false;
  }
735 736

  return true;
L
Liu Jicong 已提交
737
}