vnodeSync.c 25.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19
#define BATCH_ENABLE 0
20

21
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
22

23
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
24
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
25 26
  vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
27 28 29
  tsem_wait(&pVnode->syncSem);
}

30 31
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
32
    const STraceId *trace = &pMsg->info.traceId;
33
    taosThreadMutexLock(&pVnode->lock);
34
    if (pVnode->blocked) {
35 36
      vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
37
      pVnode->blocked = false;
38 39
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
40 41
      tsem_post(&pVnode->syncSem);
    }
42
    taosThreadMutexUnlock(&pVnode->lock);
43 44 45
  }
}

46
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
47 48 49 50 51 52 53 54 55 56 57 58
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

59 60 61
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;

  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
62 63 64 65 66 67 68 69 70 71 72
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);

  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(rsp.pCont, contLen, &newEpSet);
    rsp.contLen = contLen;
  }

  tmsgSendRsp(&rsp);
73 74
}

75 76 77 78 79 80 81 82 83
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
84 85 86 87
  } else {
    if (rsp.pCont) {
      rpcFreeCont(rsp.pCont);
    }
88 89 90 91
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
92
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
93
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
94 95 96 97 98 99 100 101 102 103
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

B
Benguang Zhao 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
  int64_t seq = 0;

  taosThreadMutexLock(&pVnode->lock);
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
  if (wait) {
    ASSERT(!pVnode->blocked);
    pVnode->blocked = true;
    pVnode->blockSec = taosGetTimestampSec();
    pVnode->blockSeq = seq;
  }
  taosThreadMutexUnlock(&pVnode->lock);

  if (code > 0) {
    vnodeHandleWriteMsg(pVnode, pMsg);
  } else if (code < 0) {
    if (terrno != 0) code = terrno;
    vnodeHandleProposeError(pVnode, pMsg, code);
  }

  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
  return code;
}

129 130
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
  if (!vnodeShouldCommit(pVnode, atExit)) {
B
Benguang Zhao 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144
    return;
  }

  int32_t   contLen = sizeof(SMsgHead);
  SMsgHead *pHead = rpcMallocCont(contLen);
  pHead->contLen = contLen;
  pHead->vgId = pVnode->config.vgId;

  SRpcMsg rpcMsg = {0};
  rpcMsg.msgType = TDMT_VND_COMMIT;
  rpcMsg.contLen = contLen;
  rpcMsg.pCont = pHead;
  rpcMsg.info.noResp = 1;

145
  vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
B
Benguang Zhao 已提交
146 147
  bool isWeak = false;

148 149 150 151 152 153 154 155 156
  if (!atExit) {
    if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
      vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
    }
    rpcFreeCont(rpcMsg.pCont);
    rpcMsg.pCont = NULL;
  } else {
    tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
B
Benguang Zhao 已提交
157 158
}

159 160
#if BATCH_ENABLE

161
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
162
  if (*arrSize <= 0) return;
163
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
164

165
  taosThreadMutexLock(&pVnode->lock);
166
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
167 168
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
  if (wait) {
169
    ASSERT(!pVnode->blocked);
170 171 172 173
    pVnode->blocked = true;
  }
  taosThreadMutexUnlock(&pVnode->lock);

174 175
  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
176
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
177
    }
178
  } else if (code < 0) {
179 180
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
181
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
182 183 184
    }
  }

185 186 187
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
  pLastMsg = NULL;

188
  for (int32_t i = 0; i < *arrSize; ++i) {
189 190 191
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
192
    rpcFreeCont(pMsg->pCont);
193
    taosFreeQitem(pMsg);
194 195 196 197 198
  }

  *arrSize = 0;
}

199
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
200 201 202 203 204 205 206
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
207 208
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

209
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
210
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
211 212 213
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

214
    const STraceId *trace = &pMsg->info.traceId;
215 216 217
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

218
    if (!pVnode->restored) {
219
      vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
220 221
      terrno = TSDB_CODE_SYN_RESTORING;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
222 223 224 225 226
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

227
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
228
      vGError("vgId:%d, msg:%p failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
229 230
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
231 232 233 234
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
235

236 237
    bool atExit = false;
    vnodeProposeCommitOnNeed(pVnode, atExit);
B
Benguang Zhao 已提交
238

239
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
240
    if (code != 0) {
241 242 243 244
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
245 246
    }

247
    if (isBlock) {
248 249 250
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

251
    pMsgArr[arrayPos] = pMsg;
252 253 254
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

255
    if (isBlock || msg == numOfMsgs - 1) {
256
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
257
    }
258
  }
259 260 261

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
262 263
}

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
#else

void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnode  *pVnode = pInfo->ahandle;
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);

    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);

    if (!pVnode->restored) {
282
      vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
283 284 285 286 287 288
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

289 290
    bool atExit = false;
    vnodeProposeCommitOnNeed(pVnode, atExit);
B
Benguang Zhao 已提交
291

292 293
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
    if (code != 0) {
294
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, tstrerror(code));
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
      if (terrno != 0) code = terrno;
      vnodeHandleProposeError(pVnode, pMsg, code);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = vnodeProposeMsg(pVnode, pMsg, isWeak);

    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
}

#endif

312
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
313
  SVnode  *pVnode = pInfo->ahandle;
314 315 316 317 318 319
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
320
    const STraceId *trace = &pMsg->info.traceId;
321

322
    if (vnodeIsMsgBlock(pMsg->msgType)) {
323 324 325 326 327 328 329
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
              ", blocking msg obtained sec:%d seq:%" PRId64,
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
              pVnode->blockSeq);
    } else {
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
330 331
    }

S
Shengliang Guan 已提交
332 333
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
334
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
335
        rsp.code = terrno;
336 337
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
338 339
      }
    }
340

341
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
342
    if (rsp.info.handle != NULL) {
343
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
344 345 346 347
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
348 349
    }

350
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
351 352
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
353
  }
M
Minghao Li 已提交
354 355
}

356
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
357 358 359
  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));

S
Shengliang Guan 已提交
360 361 362
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
  if (code != 0) {
    vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
S
Shengliang Guan 已提交
363
            TMSG_INFO(pMsg->msgType), terrstr());
364 365 366 367 368
  }

  return code;
}

369
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
370
  if (pMsg == NULL || pMsg->pCont == NULL) {
371 372
    return -1;
  }
S
Shengliang Guan 已提交
373

374 375 376
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
377 378 379
    return -1;
  }

380
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
381 382 383 384 385 386 387
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}

388
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
389 390 391 392 393 394 395
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
M
Minghao Li 已提交
396 397 398
    return -1;
  }

M
Minghao Li 已提交
399 400 401
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
402
    pMsg->pCont = NULL;
M
Minghao Li 已提交
403 404 405
  }
  return code;
}
M
Minghao Li 已提交
406

407
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
408 409 410 411 412 413 414
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
415

416
static void vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
417
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
418 419
}

420
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
421
  SVnode *pVnode = pFsm->data;
422 423
  pMsg->info.conn.applyIndex = pMeta->index;
  pMsg->info.conn.applyTerm = pMeta->term;
M
Minghao Li 已提交
424

425
  const STraceId *trace = &pMsg->info.traceId;
426
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
427
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
428
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
429
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
M
Minghao Li 已提交
430

431 432 433
  return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
}

434
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
435
  if (pMsg->code == 0) {
436
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
437 438
  }

439 440
  const STraceId *trace = &pMsg->info.traceId;
  SVnode         *pVnode = pFsm->data;
441
  vnodePostBlockMsg(pVnode, pMsg);
442

443 444 445 446
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
  }
447 448

  vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index);
449 450 451
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
  return 0;
M
Minghao Li 已提交
452 453
}

454
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
455
  if (pMeta->isWeak == 1) {
456
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
457
  }
458
  return 0;
M
Minghao Li 已提交
459 460
}

461 462 463 464 465
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
  SVnode *pVnode = pFSM->data;
  return atomic_load_64(&pVnode->state.applied);
}

466
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
467
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
468
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
469 470
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
         TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
471 472
}

S
Shengliang Guan 已提交
473
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
474 475
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
476
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
477 478
  return code;
}
S
Shengliang Guan 已提交
479

480
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
481
  SVnode *pVnode = pFsm->data;
482
  vnodeSnapReaderClose(pReader);
483
}
S
Shengliang Guan 已提交
484

S
Shengliang Guan 已提交
485
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
486
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
487
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
488 489
  return code;
}
S
Shengliang Guan 已提交
490

S
Shengliang Guan 已提交
491
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
492 493
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
494 495 496

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
497
    if (itemSize == 0) {
S
Shengliang Guan 已提交
498
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
499 500
      break;
    } else {
S
Shengliang Guan 已提交
501
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
502 503 504 505
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
506 507 508
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
}
S
Shengliang Guan 已提交
509

S
Shengliang Guan 已提交
510
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
511
  SVnode *pVnode = pFsm->data;
512
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
513
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
514

515
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
516
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
517 518
  return code;
}
S
Shengliang Guan 已提交
519

S
Shengliang Guan 已提交
520
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
521
  SVnode *pVnode = pFsm->data;
522
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
523
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
524
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
525 526
  return code;
}
S
Shengliang Guan 已提交
527

528
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
529
  SVnode *pVnode = pFsm->data;
530
  SyncIndex appliedIdx = -1;
531 532

  do {
533 534 535
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
    ASSERT(appliedIdx <= commitIdx);
    if (appliedIdx == commitIdx) {
536
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
537 538
      break;
    } else {
539 540 541
      vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
            ", applied-index:%" PRId64,
            pVnode->config.vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
542 543 544 545
      taosMsleep(10);
    }
  } while (true);

546
  ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
547
  walApplyVer(pVnode->pWal, commitIdx);
548

549
  pVnode->restored = true;
550
  vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId);
551 552

  // start to restore all stream tasks
553
  if (tsDisableStream) {
554
    vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId);
555
  } else {
556
    vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
557
    tqCheckStreamStatus(pVnode->pTq);
558
  }
559 560
}

S
Shengliang Guan 已提交
561
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
562
  SVnode *pVnode = pFsm->data;
M
Minghao Li 已提交
563
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
564

565
  taosThreadMutexLock(&pVnode->lock);
566 567
  if (pVnode->blocked) {
    pVnode->blocked = false;
568
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
569 570
    tsem_post(&pVnode->syncSem);
  }
571
  taosThreadMutexUnlock(&pVnode->lock);
572 573
}

C
cadem 已提交
574 575 576 577 578 579 580 581 582 583 584 585 586
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
  vInfo("vgId:%d, become learner", pVnode->config.vgId);

  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    pVnode->blocked = false;
    vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
    tsem_post(&pVnode->syncSem);
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

S
Shengliang Guan 已提交
587
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
588 589 590
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
}
591

592 593
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return (itemSize == 0);
  } else {
    return true;
  }
}

static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return itemSize;
  } else {
    return -1;
  }
612 613
}

614
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
615
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
616
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
617
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
618
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
S
Shengliang Guan 已提交
619 620
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
621
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
622
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
S
Shengliang Guan 已提交
623
  pFsm->FpLeaderTransferCb = NULL;
624
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
625
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
626 627
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
C
cadem 已提交
628
  pFsm->FpBecomeLearnerCb = vnodeBecomeLearner;
S
Shengliang Guan 已提交
629
  pFsm->FpReConfigCb = NULL;
S
Shengliang Guan 已提交
630 631 632 633 634 635 636
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
637
  return pFsm;
638 639 640 641
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
642
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
M
Minghao Li 已提交
643
      .batchSize = 1,
644 645 646
      .vgId = pVnode->config.vgId,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
S
Shengliang Guan 已提交
647
      .msgcb = &pVnode->msgCb,
S
Shengliang Guan 已提交
648 649 650 651 652 653
      .syncSendMSg = vnodeSyncSendMsg,
      .syncEqMsg = vnodeSyncEqMsg,
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
      .pingMs = 5000,
      .electMs = 4000,
      .heartbeatMs = 700,
654 655 656 657 658
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

S
Shengliang Guan 已提交
659 660
  SSyncCfg *pCfg = &syncInfo.syncCfg;
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
C
cadem 已提交
661
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
S
Shengliang Guan 已提交
662
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
663 664
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
665 666
  }

667 668 669 670 671 672 673 674 675
  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

  return 0;
}

B
Benguang Zhao 已提交
676
int32_t vnodeSyncStart(SVnode *pVnode) {
S
Shengliang Guan 已提交
677
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
B
Benguang Zhao 已提交
678 679 680 681 682
  if (syncStart(pVnode->sync) < 0) {
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr());
    return -1;
  }
  return 0;
683 684
}

S
Shengliang Guan 已提交
685
void vnodeSyncPreClose(SVnode *pVnode) {
686
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
S
Shengliang Guan 已提交
687 688
  syncLeaderTransfer(pVnode->sync);
  syncPreStop(pVnode->sync);
689

S
Shengliang Guan 已提交
690 691 692 693 694 695 696 697 698
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
    pVnode->blocked = false;
    tsem_post(&pVnode->syncSem);
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

699
void vnodeSyncPostClose(SVnode *pVnode) {
700
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
701 702 703
  syncPostStop(pVnode->sync);
}

S
Shengliang Guan 已提交
704
void vnodeSyncClose(SVnode *pVnode) {
S
Shengliang Guan 已提交
705
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
S
Shengliang Guan 已提交
706 707
  syncStop(pVnode->sync);
}
708

S
Shengliang Guan 已提交
709 710 711 712 713 714 715 716 717
void vnodeSyncCheckTimeout(SVnode *pVnode) {
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    int32_t curSec = taosGetTimestampSec();
    int32_t delta = curSec - pVnode->blockSec;
    if (delta > VNODE_TIMEOUT_SEC) {
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
718
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
719
#if 0  
720
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
721
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
722 723
              rpcMsg.info.handle, rpcMsg.info.ahandle);
        rpcSendResponse(&rpcMsg);
724
#endif
725
      }
S
Shengliang Guan 已提交
726 727 728 729 730 731 732 733 734
      pVnode->blocked = false;
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
      tsem_post(&pVnode->syncSem);
    }
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

735 736 737 738
bool vnodeIsRoleLeader(SVnode *pVnode) {
  SSyncState state = syncGetState(pVnode->sync);
  return state.state == TAOS_SYNC_STATE_LEADER;
}
739

740
bool vnodeIsLeader(SVnode *pVnode) {
741
  terrno = 0;
742 743
  SSyncState state = syncGetState(pVnode->sync);

744 745 746 747 748 749 750 751
  if (terrno != 0) {
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
    return false;
  }

  if (state.state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
752 753 754
    return false;
  }

755 756 757
  if (!state.restored || !pVnode->restored) {
    terrno = TSDB_CODE_SYN_RESTORING;
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
758 759
    return false;
  }
760 761

  return true;
L
Liu Jicong 已提交
762
}