vnodeSync.c 24.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19
#define BATCH_ENABLE 0
20

21
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
22

23
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
24
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
25 26
  vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
27 28 29
  tsem_wait(&pVnode->syncSem);
}

30 31
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
32
    const STraceId *trace = &pMsg->info.traceId;
33
    taosThreadMutexLock(&pVnode->lock);
34
    if (pVnode->blocked) {
35 36
      vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
37
      pVnode->blocked = false;
38 39
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
40 41
      tsem_post(&pVnode->syncSem);
    }
42
    taosThreadMutexUnlock(&pVnode->lock);
43 44 45
  }
}

46
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
47 48 49 50 51 52 53 54 55 56 57 58
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

59 60 61
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;

  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
62 63 64 65 66 67 68 69 70 71 72
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);

  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(rsp.pCont, contLen, &newEpSet);
    rsp.contLen = contLen;
  }

  tmsgSendRsp(&rsp);
73 74
}

75 76 77 78 79 80 81 82 83
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
84 85 86 87
  } else {
    if (rsp.pCont) {
      rpcFreeCont(rsp.pCont);
    }
88 89 90 91
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
92
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
93
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
94 95 96 97 98 99 100 101 102 103
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

B
Benguang Zhao 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
  int64_t seq = 0;

  taosThreadMutexLock(&pVnode->lock);
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
  if (wait) {
    ASSERT(!pVnode->blocked);
    pVnode->blocked = true;
    pVnode->blockSec = taosGetTimestampSec();
    pVnode->blockSeq = seq;
#if 0
    pVnode->blockInfo = pMsg->info;
#endif
  }
  taosThreadMutexUnlock(&pVnode->lock);

  if (code > 0) {
    vnodeHandleWriteMsg(pVnode, pMsg);
  } else if (code < 0) {
    if (terrno != 0) code = terrno;
    vnodeHandleProposeError(pVnode, pMsg, code);
  }

  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
  return code;
}

132 133
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
  if (!vnodeShouldCommit(pVnode, atExit)) {
B
Benguang Zhao 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147
    return;
  }

  int32_t   contLen = sizeof(SMsgHead);
  SMsgHead *pHead = rpcMallocCont(contLen);
  pHead->contLen = contLen;
  pHead->vgId = pVnode->config.vgId;

  SRpcMsg rpcMsg = {0};
  rpcMsg.msgType = TDMT_VND_COMMIT;
  rpcMsg.contLen = contLen;
  rpcMsg.pCont = pHead;
  rpcMsg.info.noResp = 1;

148
  vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
B
Benguang Zhao 已提交
149 150
  bool isWeak = false;

151 152 153 154 155 156 157 158 159
  if (!atExit) {
    if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
      vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
    }
    rpcFreeCont(rpcMsg.pCont);
    rpcMsg.pCont = NULL;
  } else {
    tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
B
Benguang Zhao 已提交
160

161
  vnodeUpdCommitSched(pVnode);
B
Benguang Zhao 已提交
162 163
}

164 165
#if BATCH_ENABLE

166
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
167
  if (*arrSize <= 0) return;
168
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
169

170
  taosThreadMutexLock(&pVnode->lock);
171
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
172 173
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
  if (wait) {
174
    ASSERT(!pVnode->blocked);
175 176 177 178
    pVnode->blocked = true;
  }
  taosThreadMutexUnlock(&pVnode->lock);

179 180
  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
181
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
182
    }
183
  } else if (code < 0) {
184 185
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
186
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
187 188 189
    }
  }

190 191 192
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
  pLastMsg = NULL;

193
  for (int32_t i = 0; i < *arrSize; ++i) {
194 195 196
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
197
    rpcFreeCont(pMsg->pCont);
198
    taosFreeQitem(pMsg);
199 200 201 202 203
  }

  *arrSize = 0;
}

204
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
205 206 207 208 209 210 211
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
212 213
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

214
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
215
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
216 217 218
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

219
    const STraceId *trace = &pMsg->info.traceId;
220 221 222
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

223
    if (!pVnode->restored) {
224
      vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
225 226
      terrno = TSDB_CODE_SYN_RESTORING;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
227 228 229 230 231
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

232
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
233
      vGError("vgId:%d, msg:%p failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
234 235
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
236 237 238 239
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
240

241 242
    bool atExit = false;
    vnodeProposeCommitOnNeed(pVnode, atExit);
B
Benguang Zhao 已提交
243

244
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
245
    if (code != 0) {
246 247 248 249
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
250 251
    }

252
    if (isBlock) {
253 254 255
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

256
    pMsgArr[arrayPos] = pMsg;
257 258 259
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

260
    if (isBlock || msg == numOfMsgs - 1) {
261
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
262
    }
263
  }
264 265 266

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
267 268
}

269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
#else

void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnode  *pVnode = pInfo->ahandle;
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);

    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);

    if (!pVnode->restored) {
287
      vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
288 289 290 291 292 293
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

294 295
    bool atExit = false;
    vnodeProposeCommitOnNeed(pVnode, atExit);
B
Benguang Zhao 已提交
296

297 298
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
    if (code != 0) {
299
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, tstrerror(code));
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
      if (terrno != 0) code = terrno;
      vnodeHandleProposeError(pVnode, pMsg, code);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = vnodeProposeMsg(pVnode, pMsg, isWeak);

    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
}

#endif

317
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
318
  SVnode  *pVnode = pInfo->ahandle;
319 320 321 322 323 324
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
325
    const STraceId *trace = &pMsg->info.traceId;
326

327
    if (vnodeIsMsgBlock(pMsg->msgType)) {
328 329 330 331 332 333 334
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
              ", blocking msg obtained sec:%d seq:%" PRId64,
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
              pVnode->blockSeq);
    } else {
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
335 336
    }

S
Shengliang Guan 已提交
337 338
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
339
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
340
        rsp.code = terrno;
341 342
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
343 344
      }
    }
345

346
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
347
    if (rsp.info.handle != NULL) {
348
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
349 350 351 352
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
353 354
    }

355
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
356 357
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
358
  }
M
Minghao Li 已提交
359 360
}

361
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
362 363 364
  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));

S
Shengliang Guan 已提交
365 366 367
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
  if (code != 0) {
    vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
S
Shengliang Guan 已提交
368
            TMSG_INFO(pMsg->msgType), terrstr());
369 370 371 372 373
  }

  return code;
}

374
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
375
  if (pMsg == NULL || pMsg->pCont == NULL) {
376 377
    return -1;
  }
S
Shengliang Guan 已提交
378

379 380 381
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
382 383 384
    return -1;
  }

385
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
386 387 388 389 390 391 392
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}

393
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
394 395 396 397 398 399 400
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
M
Minghao Li 已提交
401 402 403
    return -1;
  }

M
Minghao Li 已提交
404 405 406
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
407
    pMsg->pCont = NULL;
M
Minghao Li 已提交
408 409 410
  }
  return code;
}
M
Minghao Li 已提交
411

412
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
413 414 415 416 417 418 419
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
420

421
static void vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
422
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
423 424
}

425
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
426
  SVnode *pVnode = pFsm->data;
427 428
  pMsg->info.conn.applyIndex = pMeta->index;
  pMsg->info.conn.applyTerm = pMeta->term;
M
Minghao Li 已提交
429

430
  const STraceId *trace = &pMsg->info.traceId;
431
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
432
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
433
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
434
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
M
Minghao Li 已提交
435

436 437 438 439
  return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
}

static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
440
  if (pMsg->code == 0) {
441
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
442 443
  }

444 445
  const STraceId *trace = &pMsg->info.traceId;
  SVnode         *pVnode = pFsm->data;
446
  vnodePostBlockMsg(pVnode, pMsg);
447

448 449 450 451
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
  }
452 453

  vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index);
454 455 456
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
  return 0;
M
Minghao Li 已提交
457 458
}

459
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
460
  if (pMeta->isWeak == 1) {
461
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
462
  }
463
  return 0;
M
Minghao Li 已提交
464 465
}

466 467 468 469 470
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
  SVnode *pVnode = pFSM->data;
  return atomic_load_64(&pVnode->state.applied);
}

471
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
472
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
473
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
474 475
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
         TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
476 477
}

S
Shengliang Guan 已提交
478
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
479 480
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
481
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
482 483
  return code;
}
S
Shengliang Guan 已提交
484

485
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
486
  SVnode *pVnode = pFsm->data;
487
  vnodeSnapReaderClose(pReader);
488
}
S
Shengliang Guan 已提交
489

S
Shengliang Guan 已提交
490
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
491
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
492
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
493 494
  return code;
}
S
Shengliang Guan 已提交
495

S
Shengliang Guan 已提交
496
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
497 498
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
499 500 501

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
502
    if (itemSize == 0) {
S
Shengliang Guan 已提交
503
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
504 505
      break;
    } else {
S
Shengliang Guan 已提交
506
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
507 508 509 510
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
511 512 513
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
}
S
Shengliang Guan 已提交
514

S
Shengliang Guan 已提交
515
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
516
  SVnode *pVnode = pFsm->data;
517
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
518
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
519

520
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
521
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
522 523
  return code;
}
S
Shengliang Guan 已提交
524

S
Shengliang Guan 已提交
525
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
526
  SVnode *pVnode = pFsm->data;
527
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
528
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
529
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
530 531
  return code;
}
S
Shengliang Guan 已提交
532

533
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
534
  SVnode *pVnode = pFsm->data;
535
  SyncIndex appliedIdx = -1;
536 537

  do {
538 539 540
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
    ASSERT(appliedIdx <= commitIdx);
    if (appliedIdx == commitIdx) {
541
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
542 543
      break;
    } else {
544 545 546
      vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
            ", applied-index:%" PRId64,
            pVnode->config.vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
547 548 549 550
      taosMsleep(10);
    }
  } while (true);

551
  ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
552
  walApplyVer(pVnode->pWal, commitIdx);
553

554
  pVnode->restored = true;
555
  vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
556 557

  // start to restore all stream tasks
558
  tqStartStreamTasks(pVnode->pTq);
559 560
}

S
Shengliang Guan 已提交
561
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
562
  SVnode *pVnode = pFsm->data;
M
Minghao Li 已提交
563
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
564

565
  taosThreadMutexLock(&pVnode->lock);
566 567
  if (pVnode->blocked) {
    pVnode->blocked = false;
568
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
569 570
    tsem_post(&pVnode->syncSem);
  }
571
  taosThreadMutexUnlock(&pVnode->lock);
572 573
}

S
Shengliang Guan 已提交
574
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
575 576 577
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
}
578

579 580
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return (itemSize == 0);
  } else {
    return true;
  }
}

static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return itemSize;
  } else {
    return -1;
  }
599 600
}

601
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
602
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
603
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
604
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
605
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
S
Shengliang Guan 已提交
606 607
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
608
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
609
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
S
Shengliang Guan 已提交
610
  pFsm->FpLeaderTransferCb = NULL;
611
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
612
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
613 614
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
S
Shengliang Guan 已提交
615
  pFsm->FpReConfigCb = NULL;
S
Shengliang Guan 已提交
616 617 618 619 620 621 622
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
623
  return pFsm;
624 625 626 627
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
628
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
M
Minghao Li 已提交
629
      .batchSize = 1,
630 631 632
      .vgId = pVnode->config.vgId,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
S
Shengliang Guan 已提交
633
      .msgcb = &pVnode->msgCb,
S
Shengliang Guan 已提交
634 635 636 637 638 639
      .syncSendMSg = vnodeSyncSendMsg,
      .syncEqMsg = vnodeSyncEqMsg,
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
      .pingMs = 5000,
      .electMs = 4000,
      .heartbeatMs = 700,
640 641 642 643 644
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

S
Shengliang Guan 已提交
645 646 647 648
  SSyncCfg *pCfg = &syncInfo.syncCfg;
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
649 650
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
651 652
  }

653 654 655 656 657 658 659 660 661
  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

  return 0;
}

B
Benguang Zhao 已提交
662
int32_t vnodeSyncStart(SVnode *pVnode) {
S
Shengliang Guan 已提交
663
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
B
Benguang Zhao 已提交
664 665 666 667 668
  if (syncStart(pVnode->sync) < 0) {
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr());
    return -1;
  }
  return 0;
669 670
}

S
Shengliang Guan 已提交
671
void vnodeSyncPreClose(SVnode *pVnode) {
672
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
S
Shengliang Guan 已提交
673 674
  syncLeaderTransfer(pVnode->sync);
  syncPreStop(pVnode->sync);
675

S
Shengliang Guan 已提交
676 677 678 679 680 681 682 683 684
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
    pVnode->blocked = false;
    tsem_post(&pVnode->syncSem);
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

685
void vnodeSyncPostClose(SVnode *pVnode) {
686
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
687 688 689
  syncPostStop(pVnode->sync);
}

S
Shengliang Guan 已提交
690
void vnodeSyncClose(SVnode *pVnode) {
S
Shengliang Guan 已提交
691
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
S
Shengliang Guan 已提交
692 693
  syncStop(pVnode->sync);
}
694

S
Shengliang Guan 已提交
695 696 697 698 699 700 701 702 703
void vnodeSyncCheckTimeout(SVnode *pVnode) {
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    int32_t curSec = taosGetTimestampSec();
    int32_t delta = curSec - pVnode->blockSec;
    if (delta > VNODE_TIMEOUT_SEC) {
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
704
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
705
#if 0  
706
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
707
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
708 709
              rpcMsg.info.handle, rpcMsg.info.ahandle);
        rpcSendResponse(&rpcMsg);
710
#endif
711
      }
S
Shengliang Guan 已提交
712 713 714 715 716 717 718 719 720
      pVnode->blocked = false;
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
      tsem_post(&pVnode->syncSem);
    }
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

721 722 723 724
bool vnodeIsRoleLeader(SVnode *pVnode) {
  SSyncState state = syncGetState(pVnode->sync);
  return state.state == TAOS_SYNC_STATE_LEADER;
}
725

726
bool vnodeIsLeader(SVnode *pVnode) {
727
  terrno = 0;
728 729
  SSyncState state = syncGetState(pVnode->sync);

730 731 732 733 734 735 736 737
  if (terrno != 0) {
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
    return false;
  }

  if (state.state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
738 739 740
    return false;
  }

741 742 743
  if (!state.restored || !pVnode->restored) {
    terrno = TSDB_CODE_SYN_RESTORING;
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
744 745
    return false;
  }
746 747

  return true;
L
Liu Jicong 已提交
748
}