vnodeSync.c 20.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19
#define BATCH_ENABLE 0
20

21
static inline bool vnodeIsMsgBlock(tmsg_t type) {
S
Shengliang Guan 已提交
22
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
S
Shengliang Guan 已提交
23
         (type == TDMT_VND_UPDATE_TAG_VAL);
24
}
M
Minghao Li 已提交
25

26
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
27

28 29
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
30
    const STraceId *trace = &pMsg->info.traceId;
31
    taosThreadMutexLock(&pVnode->lock);
32 33 34
    if (!pVnode->blocked) {
      vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
      pVnode->blocked = true;
35
      taosThreadMutexUnlock(&pVnode->lock);
36 37
      tsem_wait(&pVnode->syncSem);
    } else {
38
      taosThreadMutexUnlock(&pVnode->lock);
39
    }
40
  }
M
Minghao Li 已提交
41 42
}

43 44
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
45
    const STraceId *trace = &pMsg->info.traceId;
46
    taosThreadMutexLock(&pVnode->lock);
47
    if (pVnode->blocked) {
48
      vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
49
      pVnode->blocked = false;
50 51
      tsem_post(&pVnode->syncSem);
    }
52
    taosThreadMutexUnlock(&pVnode->lock);
53 54 55
  }
}

56 57 58 59 60 61 62 63 64 65 66 67 68
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

69
  SRpcMsg rsp = {.code = TSDB_CODE_SYN_NOT_LEADER, .info = pMsg->info, .msgType = pMsg->msgType + 1};
70 71 72
  tmsgSendRedirectRsp(&rsp, &newEpSet);
}

73 74 75 76 77 78 79 80 81
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
82 83 84 85
  } else {
    if (rsp.pCont) {
      rpcFreeCont(rsp.pCont);
    }
86 87 88 89
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
90
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
91 92 93 94 95 96 97 98 99 100 101
    vnodeRedirectRpcMsg(pVnode, pMsg);
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

102 103
#if BATCH_ENABLE

104
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
105 106 107 108 109
  if (*arrSize <= 0) return;

  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
110
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
111 112
    }
  } else if (code == 0) {
113
    vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]);
114 115 116
  } else {
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
117
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
118 119 120 121
    }
  }

  for (int32_t i = 0; i < *arrSize; ++i) {
122 123 124
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
125
    rpcFreeCont(pMsg->pCont);
126
    taosFreeQitem(pMsg);
127 128 129 130 131
  }

  *arrSize = 0;
}

132
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
133 134 135 136 137 138 139
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
140 141
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

142
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
143
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
144 145 146
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

147
    const STraceId *trace = &pMsg->info.traceId;
148 149 150
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

151
    if (!pVnode->restored) {
152
      vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
153 154
      terrno = TSDB_CODE_SYN_RESTORING;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
155 156 157 158 159
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

160 161
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
      vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
162 163
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
164 165 166 167
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
168

169
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
170
    if (code != 0) {
171 172 173 174
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
175 176
    }

177
    if (isBlock) {
178 179 180
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

181
    pMsgArr[arrayPos] = pMsg;
182 183 184
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

185
    if (isBlock || msg == numOfMsgs - 1) {
186
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
187
    }
188
  }
189 190 191

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
192 193
}

194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
#else

static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak);
  if (code > 0) {
    vnodeHandleWriteMsg(pVnode, pMsg);
  } else if (code == 0) {
    vnodeWaitBlockMsg(pVnode, pMsg);
  } else {
    if (terrno != 0) code = terrno;
    vnodeHandleProposeError(pVnode, pMsg, code);
  }

  return code;
}

void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnode  *pVnode = pInfo->ahandle;
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);

    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);

    if (!pVnode->restored) {
      vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = vnodePreProcessWriteMsg(pVnode, pMsg);
    if (code != 0) {
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      if (terrno != 0) code = terrno;
      vnodeHandleProposeError(pVnode, pMsg, code);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = vnodeProposeMsg(pVnode, pMsg, isWeak);

    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
}

#endif

253
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
254
  SVnode  *pVnode = pInfo->ahandle;
255 256 257 258 259 260
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
261
    const STraceId *trace = &pMsg->info.traceId;
262 263
    vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
            TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
264

S
Shengliang Guan 已提交
265 266
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
267
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
268
        rsp.code = terrno;
269 270
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
271 272
      }
    }
273

274
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
275
    if (rsp.info.handle != NULL) {
276
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
277 278 279 280
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
281 282
    }

283
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
284 285
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
286
  }
M
Minghao Li 已提交
287 288
}

289
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
290
  const STraceId *trace = &pMsg->info.traceId;
291
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
292

S
Shengliang Guan 已提交
293 294 295
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
  if (code != 0) {
    vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
S
Shengliang Guan 已提交
296
            TMSG_INFO(pMsg->msgType), terrstr());
S
Shengliang Guan 已提交
297 298
  }

299
  return code;
S
Shengliang Guan 已提交
300 301
}

302
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
303 304 305 306 307 308 309 310 311 312
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
    return -1;
  }

313 314 315 316 317 318 319 320
  int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}

321
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
322 323 324 325 326 327 328 329 330 331
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
    return -1;
  }

M
Minghao Li 已提交
332 333 334
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
335
    pMsg->pCont = NULL;
M
Minghao Li 已提交
336 337 338
  }
  return code;
}
M
Minghao Li 已提交
339

340
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
341 342 343 344 345 346 347
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
348

S
Shengliang Guan 已提交
349
static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
350
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
351 352 353
  return 0;
}

354
static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
355
  SVnode *pVnode = pFsm->data;
356 357
  pMsg->info.conn.applyIndex = pMeta->index;
  pMsg->info.conn.applyTerm = pMeta->term;
M
Minghao Li 已提交
358

359
  const STraceId *trace = &pMsg->info.traceId;
360 361
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
          ", weak:%d, code:%d, state:%d %s, type:%s",
362
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
363
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
364

365
  tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
M
Minghao Li 已提交
366 367
}

368
static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
369
  vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
M
Minghao Li 已提交
370 371
}

372
static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
373
  if (pMeta->isWeak == 1) {
S
Shengliang Guan 已提交
374
    vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
S
Shengliang Guan 已提交
375 376 377
  }
}

378
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
379
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
380
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
381 382
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
         TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
383 384
}

S
Shengliang Guan 已提交
385
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
386 387
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
388
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
389 390
  return code;
}
S
Shengliang Guan 已提交
391

S
Shengliang Guan 已提交
392
static int32_t vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
393
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
394
  int32_t code = vnodeSnapReaderClose(pReader);
395 396
  return code;
}
S
Shengliang Guan 已提交
397

S
Shengliang Guan 已提交
398
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
399
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
400
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
401 402
  return code;
}
S
Shengliang Guan 已提交
403

S
Shengliang Guan 已提交
404
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
405 406
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
407 408 409

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
410
    if (itemSize == 0) {
S
Shengliang Guan 已提交
411
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
412 413
      break;
    } else {
S
Shengliang Guan 已提交
414
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
415 416 417 418
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
419 420 421
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
}
S
Shengliang Guan 已提交
422

S
Shengliang Guan 已提交
423
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
424
  SVnode *pVnode = pFsm->data;
425
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
426
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
427

428
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
429
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
430 431
  return code;
}
S
Shengliang Guan 已提交
432

S
Shengliang Guan 已提交
433
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
434
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
435
  vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
436
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
S
Shengliang Guan 已提交
437
  vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
438 439
  return code;
}
S
Shengliang Guan 已提交
440

S
Shengliang Guan 已提交
441
static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
442
  SVnode *pVnode = pFsm->data;
443 444 445

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
446
    if (itemSize == 0) {
447 448 449
      vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId);
      break;
    } else {
S
Shengliang Guan 已提交
450
      vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize);
451 452 453 454
      taosMsleep(10);
    }
  } while (true);

455 456
  walApplyVer(pVnode->pWal, pVnode->state.applied);

457
  pVnode->restored = true;
458
  vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
459 460
}

S
Shengliang Guan 已提交
461
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
462
  SVnode *pVnode = pFsm->data;
M
Minghao Li 已提交
463
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
464

465
  taosThreadMutexLock(&pVnode->lock);
466 467
  if (pVnode->blocked) {
    pVnode->blocked = false;
468
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
469 470
    tsem_post(&pVnode->syncSem);
  }
471
  taosThreadMutexUnlock(&pVnode->lock);
472 473
}

S
Shengliang Guan 已提交
474
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
475 476 477 478
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
}

479 480
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return (itemSize == 0);
  } else {
    return true;
  }
}

static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return itemSize;
  } else {
    return -1;
  }
499 500
}

501
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
502
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
503
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
504 505 506
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
507
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
508
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
S
Shengliang Guan 已提交
509
  pFsm->FpLeaderTransferCb = NULL;
510
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
511
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
512 513
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
S
Shengliang Guan 已提交
514
  pFsm->FpReConfigCb = NULL;
S
Shengliang Guan 已提交
515 516 517 518 519 520 521
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
522
  return pFsm;
523 524 525 526
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
527
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
M
Minghao Li 已提交
528
      .batchSize = 1,
529 530 531
      .vgId = pVnode->config.vgId,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
S
Shengliang Guan 已提交
532
      .msgcb = &pVnode->msgCb,
S
Shengliang Guan 已提交
533 534 535 536 537 538
      .syncSendMSg = vnodeSyncSendMsg,
      .syncEqMsg = vnodeSyncEqMsg,
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
      .pingMs = 5000,
      .electMs = 4000,
      .heartbeatMs = 700,
539 540 541 542 543
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

S
Shengliang Guan 已提交
544 545 546 547 548 549 550
  SSyncCfg *pCfg = &syncInfo.syncCfg;
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
    vInfo("vgId:%d, index:%d ep:%s:%u", pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

551 552 553 554 555 556 557 558 559
  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

  return 0;
}

B
Benguang Zhao 已提交
560
int32_t vnodeSyncStart(SVnode *pVnode) {
S
Shengliang Guan 已提交
561
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
B
Benguang Zhao 已提交
562 563 564 565 566
  if (syncStart(pVnode->sync) < 0) {
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr());
    return -1;
  }
  return 0;
567 568
}

S
Shengliang Guan 已提交
569 570 571 572
void vnodeSyncPreClose(SVnode *pVnode) {
  vInfo("vgId:%d, pre close sync", pVnode->config.vgId);
  syncLeaderTransfer(pVnode->sync);
  syncPreStop(pVnode->sync);
573
#if 0
574
  while (syncSnapshotRecving(pVnode->sync)) {
575
    vInfo("vgId:%d, snapshot is recving", pVnode->config.vgId);
576 577
    taosMsleep(300);
  }
578
  while (syncSnapshotSending(pVnode->sync)) {
579
    vInfo("vgId:%d, snapshot is sending", pVnode->config.vgId);
580 581
    taosMsleep(300);
  }
582
#endif
S
Shengliang Guan 已提交
583 584 585 586 587 588 589 590 591
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
    pVnode->blocked = false;
    tsem_post(&pVnode->syncSem);
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

S
Shengliang Guan 已提交
592
void vnodeSyncClose(SVnode *pVnode) {
S
Shengliang Guan 已提交
593
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
S
Shengliang Guan 已提交
594 595
  syncStop(pVnode->sync);
}
596

597 598 599 600
bool vnodeIsRoleLeader(SVnode *pVnode) {
  SSyncState state = syncGetState(pVnode->sync);
  return state.state == TAOS_SYNC_STATE_LEADER;
}
601

602
bool vnodeIsLeader(SVnode *pVnode) {
603
  terrno = 0;
604 605
  SSyncState state = syncGetState(pVnode->sync);

606 607 608 609 610 611 612 613
  if (terrno != 0) {
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
    return false;
  }

  if (state.state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
614 615 616
    return false;
  }

617 618 619
  if (!state.restored || !pVnode->restored) {
    terrno = TSDB_CODE_SYN_RESTORING;
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
620 621
    return false;
  }
622 623

  return true;
L
Liu Jicong 已提交
624
}