vnodeSync.c 24.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19
#define BATCH_ENABLE 0
20

21
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
22

23
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
24
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
25 26
  vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
27 28 29
  tsem_wait(&pVnode->syncSem);
}

30 31
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
32
    const STraceId *trace = &pMsg->info.traceId;
33
    taosThreadMutexLock(&pVnode->lock);
34
    if (pVnode->blocked) {
35 36
      vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
37
      pVnode->blocked = false;
38 39
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
40 41
      tsem_post(&pVnode->syncSem);
    }
42
    taosThreadMutexUnlock(&pVnode->lock);
43 44 45
  }
}

46
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
47 48 49 50 51 52 53 54 55 56 57 58
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

59 60 61
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;

  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
62 63 64 65 66 67 68 69 70 71 72
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);

  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(rsp.pCont, contLen, &newEpSet);
    rsp.contLen = contLen;
  }

  tmsgSendRsp(&rsp);
73 74
}

75 76 77 78 79 80 81 82 83
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
84 85 86 87
  } else {
    if (rsp.pCont) {
      rpcFreeCont(rsp.pCont);
    }
88 89 90 91
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
92
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
93
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
94 95 96 97 98 99 100 101 102 103
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

B
Benguang Zhao 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
  int64_t seq = 0;

  taosThreadMutexLock(&pVnode->lock);
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
  if (wait) {
    ASSERT(!pVnode->blocked);
    pVnode->blocked = true;
    pVnode->blockSec = taosGetTimestampSec();
    pVnode->blockSeq = seq;
#if 0
    pVnode->blockInfo = pMsg->info;
#endif
  }
  taosThreadMutexUnlock(&pVnode->lock);

  if (code > 0) {
    vnodeHandleWriteMsg(pVnode, pMsg);
  } else if (code < 0) {
    if (terrno != 0) code = terrno;
    vnodeHandleProposeError(pVnode, pMsg, code);
  }

  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
  return code;
}

void vnodeProposeCommitOnNeed(SVnode *pVnode) {
  if (!vnodeShouldCommit(pVnode)) {
    return;
  }

  int32_t   contLen = sizeof(SMsgHead);
  SMsgHead *pHead = rpcMallocCont(contLen);
  pHead->contLen = contLen;
  pHead->vgId = pVnode->config.vgId;

  SRpcMsg rpcMsg = {0};
  rpcMsg.msgType = TDMT_VND_COMMIT;
  rpcMsg.contLen = contLen;
  rpcMsg.pCont = pHead;
  rpcMsg.info.noResp = 1;

  bool isWeak = false;
  if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
    vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
    goto _out;
  }

  vInfo("vgId:%d, proposed vnode commit", pVnode->config.vgId);

_out:
157
  vnodeUpdCommitSched(pVnode);
B
Benguang Zhao 已提交
158 159 160 161
  rpcFreeCont(rpcMsg.pCont);
  rpcMsg.pCont = NULL;
}

162 163
#if BATCH_ENABLE

164
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
165
  if (*arrSize <= 0) return;
166
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
167

168
  taosThreadMutexLock(&pVnode->lock);
169
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
170 171
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
  if (wait) {
172
    ASSERT(!pVnode->blocked);
173 174 175 176
    pVnode->blocked = true;
  }
  taosThreadMutexUnlock(&pVnode->lock);

177 178
  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
179
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
180
    }
181
  } else if (code < 0) {
182 183
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
184
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
185 186 187
    }
  }

188 189 190
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
  pLastMsg = NULL;

191
  for (int32_t i = 0; i < *arrSize; ++i) {
192 193 194
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
195
    rpcFreeCont(pMsg->pCont);
196
    taosFreeQitem(pMsg);
197 198 199 200 201
  }

  *arrSize = 0;
}

202
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
203 204 205 206 207 208 209
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
210 211
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

212
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
213
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
214 215 216
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

217
    const STraceId *trace = &pMsg->info.traceId;
218 219 220
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

221
    if (!pVnode->restored) {
222
      vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
223 224
      terrno = TSDB_CODE_SYN_RESTORING;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
225 226 227 228 229
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

230
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
231
      vGError("vgId:%d, msg:%p failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
232 233
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
234 235 236 237
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
238

B
Benguang Zhao 已提交
239 240
    vnodeProposeCommitOnNeed(pVnode);

241
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
242
    if (code != 0) {
243 244 245 246
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
247 248
    }

249
    if (isBlock) {
250 251 252
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

253
    pMsgArr[arrayPos] = pMsg;
254 255 256
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

257
    if (isBlock || msg == numOfMsgs - 1) {
258
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
259
    }
260
  }
261 262 263

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
264 265
}

266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
#else

void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnode  *pVnode = pInfo->ahandle;
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);

    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);

    if (!pVnode->restored) {
284
      vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
285 286 287 288 289 290
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

B
Benguang Zhao 已提交
291 292
    vnodeProposeCommitOnNeed(pVnode);

293 294
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
    if (code != 0) {
295
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, tstrerror(code));
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
      if (terrno != 0) code = terrno;
      vnodeHandleProposeError(pVnode, pMsg, code);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = vnodeProposeMsg(pVnode, pMsg, isWeak);

    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
}

#endif

313
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
314
  SVnode  *pVnode = pInfo->ahandle;
315 316 317 318 319 320
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
321
    const STraceId *trace = &pMsg->info.traceId;
322

323
    if (vnodeIsMsgBlock(pMsg->msgType)) {
324 325 326 327 328 329 330
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
              ", blocking msg obtained sec:%d seq:%" PRId64,
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
              pVnode->blockSeq);
    } else {
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
331 332
    }

S
Shengliang Guan 已提交
333 334
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
335
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
336
        rsp.code = terrno;
337 338
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
339 340
      }
    }
341

342
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
343
    if (rsp.info.handle != NULL) {
344
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
345 346 347 348
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
349 350
    }

351
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
352 353
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
354
  }
M
Minghao Li 已提交
355 356
}

357
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
358 359 360
  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));

S
Shengliang Guan 已提交
361 362 363
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
  if (code != 0) {
    vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
S
Shengliang Guan 已提交
364
            TMSG_INFO(pMsg->msgType), terrstr());
365 366 367 368 369
  }

  return code;
}

370
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
371
  if (pMsg == NULL || pMsg->pCont == NULL) {
372 373
    return -1;
  }
S
Shengliang Guan 已提交
374

375 376 377
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
378 379 380 381 382 383 384 385 386 387 388
    return -1;
  }

  int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}

389
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
390 391 392 393 394 395 396
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
M
Minghao Li 已提交
397 398 399
    return -1;
  }

M
Minghao Li 已提交
400 401 402
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
403
    pMsg->pCont = NULL;
M
Minghao Li 已提交
404 405 406
  }
  return code;
}
M
Minghao Li 已提交
407

408
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
409 410 411 412 413 414 415
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
416

417
static void vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
418
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
419 420
}

421
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
422
  SVnode *pVnode = pFsm->data;
423 424
  pMsg->info.conn.applyIndex = pMeta->index;
  pMsg->info.conn.applyTerm = pMeta->term;
M
Minghao Li 已提交
425

426
  const STraceId *trace = &pMsg->info.traceId;
427
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
428
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
429
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
430
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
M
Minghao Li 已提交
431

432 433 434 435
  return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
}

static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
436
  if (pMsg->code == 0) {
437
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
438 439
  }

440 441
  const STraceId *trace = &pMsg->info.traceId;
  SVnode         *pVnode = pFsm->data;
442
  vnodePostBlockMsg(pVnode, pMsg);
443

444 445 446 447
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
  }
448 449

  vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index);
450 451 452
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
  return 0;
M
Minghao Li 已提交
453 454
}

455
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
456
  if (pMeta->isWeak == 1) {
457
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
458
  }
459
  return 0;
M
Minghao Li 已提交
460 461
}

462 463 464 465 466
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
  SVnode *pVnode = pFSM->data;
  return atomic_load_64(&pVnode->state.applied);
}

467
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
468
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
469
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
470 471
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
         TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
472 473
}

S
Shengliang Guan 已提交
474
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
475 476
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
477
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
478 479
  return code;
}
S
Shengliang Guan 已提交
480

481
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
482
  SVnode *pVnode = pFsm->data;
483
  vnodeSnapReaderClose(pReader);
484
}
S
Shengliang Guan 已提交
485

S
Shengliang Guan 已提交
486
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
487
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
488
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
489 490
  return code;
}
S
Shengliang Guan 已提交
491

S
Shengliang Guan 已提交
492
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
493 494
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
495 496 497

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
498
    if (itemSize == 0) {
S
Shengliang Guan 已提交
499
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
500 501
      break;
    } else {
S
Shengliang Guan 已提交
502
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
503 504 505 506
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
507 508 509
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
}
S
Shengliang Guan 已提交
510

S
Shengliang Guan 已提交
511
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
512
  SVnode *pVnode = pFsm->data;
513
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
514
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
515

516
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
517
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
518 519
  return code;
}
S
Shengliang Guan 已提交
520

S
Shengliang Guan 已提交
521
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
522
  SVnode *pVnode = pFsm->data;
523
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
524
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
525
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
526 527
  return code;
}
S
Shengliang Guan 已提交
528

529
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
530
  SVnode *pVnode = pFsm->data;
531
  SyncIndex appliedIdx = -1;
532 533

  do {
534 535 536
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
    ASSERT(appliedIdx <= commitIdx);
    if (appliedIdx == commitIdx) {
537
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
538 539
      break;
    } else {
540 541 542
      vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
            ", applied-index:%" PRId64,
            pVnode->config.vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
543 544 545 546
      taosMsleep(10);
    }
  } while (true);

547
  ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
548
  walApplyVer(pVnode->pWal, commitIdx);
549

550
  pVnode->restored = true;
551
  vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
552 553
}

S
Shengliang Guan 已提交
554
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
555
  SVnode *pVnode = pFsm->data;
M
Minghao Li 已提交
556
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
557

558
  taosThreadMutexLock(&pVnode->lock);
559 560
  if (pVnode->blocked) {
    pVnode->blocked = false;
561
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
562 563
    tsem_post(&pVnode->syncSem);
  }
564
  taosThreadMutexUnlock(&pVnode->lock);
565 566
}

S
Shengliang Guan 已提交
567
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
568 569 570
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
}
571

572 573
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return (itemSize == 0);
  } else {
    return true;
  }
}

static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return itemSize;
  } else {
    return -1;
  }
592 593
}

594
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
595
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
596
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
597
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
598
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
S
Shengliang Guan 已提交
599 600
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
601
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
602
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
S
Shengliang Guan 已提交
603
  pFsm->FpLeaderTransferCb = NULL;
604
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
605
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
606 607
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
S
Shengliang Guan 已提交
608
  pFsm->FpReConfigCb = NULL;
S
Shengliang Guan 已提交
609 610 611 612 613 614 615
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
616
  return pFsm;
617 618 619 620
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
621
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
M
Minghao Li 已提交
622
      .batchSize = 1,
623 624 625
      .vgId = pVnode->config.vgId,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
S
Shengliang Guan 已提交
626
      .msgcb = &pVnode->msgCb,
S
Shengliang Guan 已提交
627 628 629 630 631 632
      .syncSendMSg = vnodeSyncSendMsg,
      .syncEqMsg = vnodeSyncEqMsg,
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
      .pingMs = 5000,
      .electMs = 4000,
      .heartbeatMs = 700,
633 634 635 636 637
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

S
Shengliang Guan 已提交
638 639 640 641
  SSyncCfg *pCfg = &syncInfo.syncCfg;
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
642 643
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort,
          pNode->nodeId, pNode->clusterId);
S
Shengliang Guan 已提交
644 645
  }

646 647 648 649 650 651 652 653 654
  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

  return 0;
}

B
Benguang Zhao 已提交
655
int32_t vnodeSyncStart(SVnode *pVnode) {
S
Shengliang Guan 已提交
656
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
B
Benguang Zhao 已提交
657 658 659 660 661
  if (syncStart(pVnode->sync) < 0) {
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr());
    return -1;
  }
  return 0;
662 663
}

S
Shengliang Guan 已提交
664
void vnodeSyncPreClose(SVnode *pVnode) {
665
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
S
Shengliang Guan 已提交
666 667
  syncLeaderTransfer(pVnode->sync);
  syncPreStop(pVnode->sync);
668

S
Shengliang Guan 已提交
669 670 671 672 673 674 675 676 677
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
    pVnode->blocked = false;
    tsem_post(&pVnode->syncSem);
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

678
void vnodeSyncPostClose(SVnode *pVnode) {
679
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
680 681 682
  syncPostStop(pVnode->sync);
}

S
Shengliang Guan 已提交
683
void vnodeSyncClose(SVnode *pVnode) {
S
Shengliang Guan 已提交
684
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
S
Shengliang Guan 已提交
685 686
  syncStop(pVnode->sync);
}
687

S
Shengliang Guan 已提交
688 689 690 691 692 693 694 695 696
void vnodeSyncCheckTimeout(SVnode *pVnode) {
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    int32_t curSec = taosGetTimestampSec();
    int32_t delta = curSec - pVnode->blockSec;
    if (delta > VNODE_TIMEOUT_SEC) {
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
697
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
698
#if 0  
699
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
700
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
701 702
              rpcMsg.info.handle, rpcMsg.info.ahandle);
        rpcSendResponse(&rpcMsg);
703
#endif
704
      }
S
Shengliang Guan 已提交
705 706 707 708 709 710 711 712 713
      pVnode->blocked = false;
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
      tsem_post(&pVnode->syncSem);
    }
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

714 715 716 717
bool vnodeIsRoleLeader(SVnode *pVnode) {
  SSyncState state = syncGetState(pVnode->sync);
  return state.state == TAOS_SYNC_STATE_LEADER;
}
718

719
bool vnodeIsLeader(SVnode *pVnode) {
720
  terrno = 0;
721 722
  SSyncState state = syncGetState(pVnode->sync);

723 724 725 726 727 728 729 730
  if (terrno != 0) {
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
    return false;
  }

  if (state.state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
731 732 733
    return false;
  }

734 735 736
  if (!state.restored || !pVnode->restored) {
    terrno = TSDB_CODE_SYN_RESTORING;
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
737 738
    return false;
  }
739 740

  return true;
L
Liu Jicong 已提交
741
}