vnodeSync.c 22.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19
#define BATCH_ENABLE 0
20

21
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
22

23
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
24
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
25 26
  vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
27 28 29
  tsem_wait(&pVnode->syncSem);
}

30 31
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
32
    const STraceId *trace = &pMsg->info.traceId;
33
    taosThreadMutexLock(&pVnode->lock);
34
    if (pVnode->blocked) {
35 36
      vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
37
      pVnode->blocked = false;
38 39
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
40 41
      tsem_post(&pVnode->syncSem);
    }
42
    taosThreadMutexUnlock(&pVnode->lock);
43 44 45
  }
}

46
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
47 48 49 50 51 52 53 54 55 56 57 58
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

59 60 61
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;

  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
62 63 64 65 66 67 68 69 70 71 72
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);

  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(rsp.pCont, contLen, &newEpSet);
    rsp.contLen = contLen;
  }

  tmsgSendRsp(&rsp);
73 74
}

75 76 77 78 79 80 81 82 83
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
84 85 86 87
  } else {
    if (rsp.pCont) {
      rpcFreeCont(rsp.pCont);
    }
88 89 90 91
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
92
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
93
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
94 95 96 97 98 99 100 101 102 103
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

104 105
#if BATCH_ENABLE

106
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
107
  if (*arrSize <= 0) return;
108
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
109

110
  taosThreadMutexLock(&pVnode->lock);
111
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
112 113
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
  if (wait) {
114
    ASSERT(!pVnode->blocked);
115 116 117 118
    pVnode->blocked = true;
  }
  taosThreadMutexUnlock(&pVnode->lock);

119 120
  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
121
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
122
    }
123
  } else if (code < 0) {
124 125
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
126
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
127 128 129
    }
  }

130 131 132
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
  pLastMsg = NULL;

133
  for (int32_t i = 0; i < *arrSize; ++i) {
134 135 136
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
137
    rpcFreeCont(pMsg->pCont);
138
    taosFreeQitem(pMsg);
139 140 141 142 143
  }

  *arrSize = 0;
}

144
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
145 146 147 148 149 150 151
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
152 153
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

154
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
155
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
156 157 158
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

159
    const STraceId *trace = &pMsg->info.traceId;
160 161 162
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

163
    if (!pVnode->restored) {
164
      vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
165 166
      terrno = TSDB_CODE_SYN_RESTORING;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
167 168 169 170 171
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

172 173
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
      vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
174 175
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
176 177 178 179
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
180

181
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
182
    if (code != 0) {
183 184 185 186
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
187 188
    }

189
    if (isBlock) {
190 191 192
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

193
    pMsgArr[arrayPos] = pMsg;
194 195 196
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

197
    if (isBlock || msg == numOfMsgs - 1) {
198
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
199
    }
200
  }
201 202 203

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
204 205
}

206 207 208
#else

static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
S
Shengliang Guan 已提交
209 210
  int64_t seq = 0;

211
  taosThreadMutexLock(&pVnode->lock);
S
Shengliang Guan 已提交
212
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
213 214
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
  if (wait) {
215
    ASSERT(!pVnode->blocked);
216
    pVnode->blocked = true;
S
Shengliang Guan 已提交
217 218
    pVnode->blockSec = taosGetTimestampSec();
    pVnode->blockSeq = seq;
219
#if 0
220
    pVnode->blockInfo = pMsg->info;
221
#endif
222 223 224
  }
  taosThreadMutexUnlock(&pVnode->lock);

225 226
  if (code > 0) {
    vnodeHandleWriteMsg(pVnode, pMsg);
227
  } else if (code < 0) {
228 229 230 231
    if (terrno != 0) code = terrno;
    vnodeHandleProposeError(pVnode, pMsg, code);
  }

232
  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
  return code;
}

void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnode  *pVnode = pInfo->ahandle;
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);

    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);

    if (!pVnode->restored) {
      vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = vnodePreProcessWriteMsg(pVnode, pMsg);
    if (code != 0) {
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      if (terrno != 0) code = terrno;
      vnodeHandleProposeError(pVnode, pMsg, code);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = vnodeProposeMsg(pVnode, pMsg, isWeak);

    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
}

#endif

279
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
280
  SVnode  *pVnode = pInfo->ahandle;
281 282 283 284 285 286
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
287
    const STraceId *trace = &pMsg->info.traceId;
288

289
    if (vnodeIsMsgBlock(pMsg->msgType)) {
290 291 292 293 294 295 296
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
              ", blocking msg obtained sec:%d seq:%" PRId64,
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
              pVnode->blockSeq);
    } else {
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
297 298
    }

S
Shengliang Guan 已提交
299 300
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
301
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
302
        rsp.code = terrno;
303 304
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
305 306
      }
    }
307

308
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
309
    if (rsp.info.handle != NULL) {
310
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
311 312 313 314
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
315 316
    }

317
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
318 319
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
320
  }
M
Minghao Li 已提交
321 322
}

323
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
324 325 326
  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));

S
Shengliang Guan 已提交
327 328 329
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
  if (code != 0) {
    vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
S
Shengliang Guan 已提交
330
            TMSG_INFO(pMsg->msgType), terrstr());
331 332 333 334 335
  }

  return code;
}

336
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
337
  if (pMsg == NULL || pMsg->pCont == NULL) {
338 339
    return -1;
  }
S
Shengliang Guan 已提交
340

341 342 343
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
344 345 346 347 348 349 350 351 352 353 354
    return -1;
  }

  int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}

355
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
356 357 358 359 360 361 362
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
M
Minghao Li 已提交
363 364 365
    return -1;
  }

M
Minghao Li 已提交
366 367 368
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
369
    pMsg->pCont = NULL;
M
Minghao Li 已提交
370 371 372
  }
  return code;
}
M
Minghao Li 已提交
373

374
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
375 376 377 378 379 380 381
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
382

383
static void vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
384
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
385 386
}

387
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
388
  SVnode *pVnode = pFsm->data;
389 390
  pMsg->info.conn.applyIndex = pMeta->index;
  pMsg->info.conn.applyTerm = pMeta->term;
M
Minghao Li 已提交
391

392
  const STraceId *trace = &pMsg->info.traceId;
393 394
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
          ", weak:%d, code:%d, state:%d %s, type:%s",
395
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
396
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
397

398
  return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
M
Minghao Li 已提交
399 400
}

401 402
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
  return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
M
Minghao Li 已提交
403 404
}

405
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
406
  if (pMeta->isWeak == 1) {
407
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
408
  }
409
  return 0;
M
Minghao Li 已提交
410 411
}

412
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
413
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
414
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
415 416
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
         TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
417 418
}

S
Shengliang Guan 已提交
419
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
420 421
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
422
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
423 424
  return code;
}
S
Shengliang Guan 已提交
425

S
Shengliang Guan 已提交
426
static int32_t vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
427
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
428
  int32_t code = vnodeSnapReaderClose(pReader);
429 430
  return code;
}
S
Shengliang Guan 已提交
431

S
Shengliang Guan 已提交
432
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
433
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
434
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
435 436
  return code;
}
S
Shengliang Guan 已提交
437

S
Shengliang Guan 已提交
438
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
439 440
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
441 442 443

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
444
    if (itemSize == 0) {
S
Shengliang Guan 已提交
445
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
446 447
      break;
    } else {
S
Shengliang Guan 已提交
448
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
449 450 451 452
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
453 454 455
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
}
S
Shengliang Guan 已提交
456

S
Shengliang Guan 已提交
457
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
458
  SVnode *pVnode = pFsm->data;
459
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
460
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
461

462
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
463
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
464 465
  return code;
}
S
Shengliang Guan 已提交
466

S
Shengliang Guan 已提交
467
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
468
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
469
  vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
470
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
S
Shengliang Guan 已提交
471
  vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
472 473
  return code;
}
S
Shengliang Guan 已提交
474

S
Shengliang Guan 已提交
475
static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
476
  SVnode *pVnode = pFsm->data;
477 478 479

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
480
    if (itemSize == 0) {
481 482 483
      vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId);
      break;
    } else {
S
Shengliang Guan 已提交
484
      vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize);
485 486 487 488
      taosMsleep(10);
    }
  } while (true);

489 490
  walApplyVer(pVnode->pWal, pVnode->state.applied);

491
  pVnode->restored = true;
492
  vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
493 494
}

S
Shengliang Guan 已提交
495
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
496
  SVnode *pVnode = pFsm->data;
M
Minghao Li 已提交
497
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
498

499
  taosThreadMutexLock(&pVnode->lock);
500 501
  if (pVnode->blocked) {
    pVnode->blocked = false;
502
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
503 504
    tsem_post(&pVnode->syncSem);
  }
505
  taosThreadMutexUnlock(&pVnode->lock);
506 507
}

S
Shengliang Guan 已提交
508
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
509 510 511
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
}
512

513 514
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return (itemSize == 0);
  } else {
    return true;
  }
}

static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return itemSize;
  } else {
    return -1;
  }
533 534
}

535
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
536
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
537
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
538 539 540
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
541
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
542
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
S
Shengliang Guan 已提交
543
  pFsm->FpLeaderTransferCb = NULL;
544
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
545
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
546 547
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
S
Shengliang Guan 已提交
548
  pFsm->FpReConfigCb = NULL;
S
Shengliang Guan 已提交
549 550 551 552 553 554 555
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
556
  return pFsm;
557 558 559 560
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
561
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
M
Minghao Li 已提交
562
      .batchSize = 1,
563 564 565
      .vgId = pVnode->config.vgId,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
S
Shengliang Guan 已提交
566
      .msgcb = &pVnode->msgCb,
S
Shengliang Guan 已提交
567 568 569 570 571 572
      .syncSendMSg = vnodeSyncSendMsg,
      .syncEqMsg = vnodeSyncEqMsg,
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
      .pingMs = 5000,
      .electMs = 4000,
      .heartbeatMs = 700,
573 574 575 576 577
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

S
Shengliang Guan 已提交
578 579 580 581 582 583 584
  SSyncCfg *pCfg = &syncInfo.syncCfg;
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
    vInfo("vgId:%d, index:%d ep:%s:%u", pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

585 586 587 588 589 590 591 592 593
  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

  return 0;
}

B
Benguang Zhao 已提交
594
int32_t vnodeSyncStart(SVnode *pVnode) {
S
Shengliang Guan 已提交
595
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
B
Benguang Zhao 已提交
596 597 598 599 600
  if (syncStart(pVnode->sync) < 0) {
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr());
    return -1;
  }
  return 0;
601 602
}

S
Shengliang Guan 已提交
603 604 605 606
void vnodeSyncPreClose(SVnode *pVnode) {
  vInfo("vgId:%d, pre close sync", pVnode->config.vgId);
  syncLeaderTransfer(pVnode->sync);
  syncPreStop(pVnode->sync);
607

S
Shengliang Guan 已提交
608 609 610 611 612 613 614 615 616
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
    pVnode->blocked = false;
    tsem_post(&pVnode->syncSem);
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

S
Shengliang Guan 已提交
617
void vnodeSyncClose(SVnode *pVnode) {
S
Shengliang Guan 已提交
618
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
S
Shengliang Guan 已提交
619 620
  syncStop(pVnode->sync);
}
621

S
Shengliang Guan 已提交
622 623 624 625 626 627 628 629 630
void vnodeSyncCheckTimeout(SVnode *pVnode) {
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    int32_t curSec = taosGetTimestampSec();
    int32_t delta = curSec - pVnode->blockSec;
    if (delta > VNODE_TIMEOUT_SEC) {
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
631
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
632
#if 0  
633
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
634
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
635 636
              rpcMsg.info.handle, rpcMsg.info.ahandle);
        rpcSendResponse(&rpcMsg);
637
#endif
638
      }
S
Shengliang Guan 已提交
639 640 641 642 643 644 645 646 647
      pVnode->blocked = false;
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
      tsem_post(&pVnode->syncSem);
    }
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

648 649 650 651
bool vnodeIsRoleLeader(SVnode *pVnode) {
  SSyncState state = syncGetState(pVnode->sync);
  return state.state == TAOS_SYNC_STATE_LEADER;
}
652

653
bool vnodeIsLeader(SVnode *pVnode) {
654
  terrno = 0;
655 656
  SSyncState state = syncGetState(pVnode->sync);

657 658 659 660 661 662 663 664
  if (terrno != 0) {
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
    return false;
  }

  if (state.state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
665 666 667
    return false;
  }

668 669 670
  if (!state.restored || !pVnode->restored) {
    terrno = TSDB_CODE_SYN_RESTORING;
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
671 672
    return false;
  }
673 674

  return true;
L
Liu Jicong 已提交
675
}