vnodeSync.c 22.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

19
#define BATCH_ENABLE 0
20

21
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
S
Shengliang Guan 已提交
22

23
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
24
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
25 26
  vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
27 28 29
  tsem_wait(&pVnode->syncSem);
}

30 31
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
  if (vnodeIsMsgBlock(pMsg->msgType)) {
32
    const STraceId *trace = &pMsg->info.traceId;
33
    taosThreadMutexLock(&pVnode->lock);
34
    if (pVnode->blocked) {
35 36
      vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
37
      pVnode->blocked = false;
38 39
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
40 41
      tsem_post(&pVnode->syncSem);
    }
42
    taosThreadMutexUnlock(&pVnode->lock);
43 44 45
  }
}

46
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
47 48 49 50 51 52 53 54 55 56 57 58
  SEpSet newEpSet = {0};
  syncGetRetryEpSet(pVnode->sync, &newEpSet);

  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
          newEpSet.numOfEps, newEpSet.inUse);
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
            newEpSet.eps[i].port);
  }
  pMsg->info.hasEpSet = 1;

59 60 61
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;

  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
62 63 64 65 66 67 68 69 70 71 72
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);

  rsp.pCont = rpcMallocCont(contLen);
  if (rsp.pCont == NULL) {
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    tSerializeSEpSet(rsp.pCont, contLen, &newEpSet);
    rsp.contLen = contLen;
  }

  tmsgSendRsp(&rsp);
73 74
}

75 76 77 78 79 80 81 82 83
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
    rsp.code = terrno;
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
  }
  if (rsp.info.handle != NULL) {
    tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
84 85 86 87
  } else {
    if (rsp.pCont) {
      rpcFreeCont(rsp.pCont);
    }
88 89 90 91
  }
}

static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
92
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
93
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
94 95 96 97 98 99 100 101 102 103
  } else {
    const STraceId *trace = &pMsg->info.traceId;
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    if (rsp.info.handle != NULL) {
      tmsgSendRsp(&rsp);
    }
  }
}

104 105
#if BATCH_ENABLE

106
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
107
  if (*arrSize <= 0) return;
108
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
109

110
  taosThreadMutexLock(&pVnode->lock);
111
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
112 113
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
  if (wait) {
114
    ASSERT(!pVnode->blocked);
115 116 117 118
    pVnode->blocked = true;
  }
  taosThreadMutexUnlock(&pVnode->lock);

119 120
  if (code > 0) {
    for (int32_t i = 0; i < *arrSize; ++i) {
121
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
122
    }
123
  } else if (code < 0) {
124 125
    if (terrno != 0) code = terrno;
    for (int32_t i = 0; i < *arrSize; ++i) {
126
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
127 128 129
    }
  }

130 131 132
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
  pLastMsg = NULL;

133
  for (int32_t i = 0; i < *arrSize; ++i) {
134 135 136
    SRpcMsg        *pMsg = pMsgArr[i];
    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
137
    rpcFreeCont(pMsg->pCont);
138
    taosFreeQitem(pMsg);
139 140 141 142 143
  }

  *arrSize = 0;
}

144
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
145 146 147 148 149 150 151
  SVnode   *pVnode = pInfo->ahandle;
  int32_t   vgId = pVnode->config.vgId;
  int32_t   code = 0;
  SRpcMsg  *pMsg = NULL;
  int32_t   arrayPos = 0;
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
152 153
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

154
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
155
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
156 157 158
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);

159
    const STraceId *trace = &pMsg->info.traceId;
160 161 162
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);

163
    if (!pVnode->restored) {
164
      vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
165 166
      terrno = TSDB_CODE_SYN_RESTORING;
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
167 168 169 170 171
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

172 173
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
      vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
174 175
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      vnodeHandleProposeError(pVnode, pMsg, terrno);
176 177 178 179
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }
180

181
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
182
    if (code != 0) {
183 184 185 186
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
187 188
    }

189
    if (isBlock) {
190 191 192
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
    }

193
    pMsgArr[arrayPos] = pMsg;
194 195 196
    pIsWeakArr[arrayPos] = isWeak;
    arrayPos++;

197
    if (isBlock || msg == numOfMsgs - 1) {
198
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
199
    }
200
  }
201 202 203

  taosMemoryFree(pMsgArr);
  taosMemoryFree(pIsWeakArr);
204 205
}

206 207 208
#else

static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
S
Shengliang Guan 已提交
209 210
  int64_t seq = 0;

211
  taosThreadMutexLock(&pVnode->lock);
S
Shengliang Guan 已提交
212
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
213 214
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
  if (wait) {
215
    ASSERT(!pVnode->blocked);
216
    pVnode->blocked = true;
S
Shengliang Guan 已提交
217 218
    pVnode->blockSec = taosGetTimestampSec();
    pVnode->blockSeq = seq;
219
#if 0
220
    pVnode->blockInfo = pMsg->info;
221
#endif
222 223 224
  }
  taosThreadMutexUnlock(&pVnode->lock);

225 226
  if (code > 0) {
    vnodeHandleWriteMsg(pVnode, pMsg);
227
  } else if (code < 0) {
228 229 230 231
    if (terrno != 0) code = terrno;
    vnodeHandleProposeError(pVnode, pMsg, code);
  }

232
  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
  return code;
}

void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnode  *pVnode = pInfo->ahandle;
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);

  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);

    const STraceId *trace = &pMsg->info.traceId;
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);

    if (!pVnode->restored) {
      vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = vnodePreProcessWriteMsg(pVnode, pMsg);
    if (code != 0) {
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
      if (terrno != 0) code = terrno;
      vnodeHandleProposeError(pVnode, pMsg, code);
      rpcFreeCont(pMsg->pCont);
      taosFreeQitem(pMsg);
      continue;
    }

    code = vnodeProposeMsg(pVnode, pMsg, isWeak);

    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
}

#endif

279
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
280
  SVnode  *pVnode = pInfo->ahandle;
281 282 283 284 285 286
  int32_t  vgId = pVnode->config.vgId;
  int32_t  code = 0;
  SRpcMsg *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
287
    const STraceId *trace = &pMsg->info.traceId;
288

289
    if (vnodeIsMsgBlock(pMsg->msgType)) {
290 291 292 293 294 295 296
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
              ", blocking msg obtained sec:%d seq:%" PRId64,
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
              pVnode->blockSeq);
    } else {
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
297 298
    }

S
Shengliang Guan 已提交
299 300
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
    if (rsp.code == 0) {
301
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
S
Shengliang Guan 已提交
302
        rsp.code = terrno;
303 304
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
                pMsg->info.conn.applyIndex);
S
Shengliang Guan 已提交
305 306
      }
    }
307

308
    vnodePostBlockMsg(pVnode, pMsg);
S
Shengliang Guan 已提交
309
    if (rsp.info.handle != NULL) {
310
      tmsgSendRsp(&rsp);
L
Liu Jicong 已提交
311 312 313 314
    } else {
      if (rsp.pCont) {
        rpcFreeCont(rsp.pCont);
      }
315 316
    }

317
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
318 319
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
320
  }
M
Minghao Li 已提交
321 322
}

323
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
324 325 326
  const STraceId *trace = &pMsg->info.traceId;
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));

S
Shengliang Guan 已提交
327 328 329
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
  if (code != 0) {
    vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
S
Shengliang Guan 已提交
330
            TMSG_INFO(pMsg->msgType), terrstr());
331 332 333 334 335
  }

  return code;
}

336
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
337
  if (pMsg == NULL || pMsg->pCont == NULL) {
338 339
    return -1;
  }
S
Shengliang Guan 已提交
340

341 342 343
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
344 345 346 347 348 349 350 351 352 353 354
    return -1;
  }

  int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}

355
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
356 357 358 359 360 361 362
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
M
Minghao Li 已提交
363 364 365
    return -1;
  }

M
Minghao Li 已提交
366 367 368
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
369
    pMsg->pCont = NULL;
M
Minghao Li 已提交
370 371 372
  }
  return code;
}
M
Minghao Li 已提交
373

374
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
375 376 377 378 379 380 381
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
382

383
static void vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
S
Shengliang Guan 已提交
384
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
385 386
}

387
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
388
  SVnode *pVnode = pFsm->data;
389 390
  pMsg->info.conn.applyIndex = pMeta->index;
  pMsg->info.conn.applyTerm = pMeta->term;
M
Minghao Li 已提交
391

392
  const STraceId *trace = &pMsg->info.traceId;
393 394
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
          ", weak:%d, code:%d, state:%d %s, type:%s",
395
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
396
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
397

398
  return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
M
Minghao Li 已提交
399 400
}

401 402
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
  return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
M
Minghao Li 已提交
403 404
}

405
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
406
  if (pMeta->isWeak == 1) {
407
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
408
  }
409
  return 0;
M
Minghao Li 已提交
410 411
}

412
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
413
  SVnode *pVnode = pFsm->data;
S
Shengliang Guan 已提交
414
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
415 416
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
         TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
417 418
}

S
Shengliang Guan 已提交
419
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
420 421
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
H
Hongze Cheng 已提交
422
  int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader);
423 424
  return code;
}
S
Shengliang Guan 已提交
425

426
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
427
  SVnode *pVnode = pFsm->data;
428
  vnodeSnapReaderClose(pReader);
429
}
S
Shengliang Guan 已提交
430

S
Shengliang Guan 已提交
431
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
432
  SVnode *pVnode = pFsm->data;
H
Hongze Cheng 已提交
433
  int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
434 435
  return code;
}
S
Shengliang Guan 已提交
436

S
Shengliang Guan 已提交
437
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
M
Minghao Li 已提交
438 439
  SVnode         *pVnode = pFsm->data;
  SSnapshotParam *pSnapshotParam = pParam;
440 441 442

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
443
    if (itemSize == 0) {
S
Shengliang Guan 已提交
444
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
445 446
      break;
    } else {
S
Shengliang Guan 已提交
447
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
448 449 450 451
      taosMsleep(10);
    }
  } while (true);

M
Minghao Li 已提交
452 453 454
  int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
  return code;
}
S
Shengliang Guan 已提交
455

S
Shengliang Guan 已提交
456
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
M
Minghao Li 已提交
457
  SVnode *pVnode = pFsm->data;
458
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
459
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
460

461
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
S
Shengliang Guan 已提交
462
  vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
M
Minghao Li 已提交
463 464
  return code;
}
S
Shengliang Guan 已提交
465

S
Shengliang Guan 已提交
466
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
M
Minghao Li 已提交
467
  SVnode *pVnode = pFsm->data;
468
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
469
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
470
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
M
Minghao Li 已提交
471 472
  return code;
}
S
Shengliang Guan 已提交
473

S
Shengliang Guan 已提交
474
static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
475
  SVnode *pVnode = pFsm->data;
476 477 478

  do {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
S
Shengliang Guan 已提交
479
    if (itemSize == 0) {
480 481 482
      vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId);
      break;
    } else {
S
Shengliang Guan 已提交
483
      vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize);
484 485 486 487
      taosMsleep(10);
    }
  } while (true);

488 489
  walApplyVer(pVnode->pWal, pVnode->state.applied);

490
  pVnode->restored = true;
491
  vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
492 493
}

S
Shengliang Guan 已提交
494
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
495
  SVnode *pVnode = pFsm->data;
M
Minghao Li 已提交
496
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
497

498
  taosThreadMutexLock(&pVnode->lock);
499 500
  if (pVnode->blocked) {
    pVnode->blocked = false;
501
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
502 503
    tsem_post(&pVnode->syncSem);
  }
504
  taosThreadMutexUnlock(&pVnode->lock);
505 506
}

S
Shengliang Guan 已提交
507
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
508 509 510
  SVnode *pVnode = pFsm->data;
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
}
511

512 513
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return (itemSize == 0);
  } else {
    return true;
  }
}

static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
  SVnode *pVnode = pFsm->data;

  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
    return itemSize;
  } else {
    return -1;
  }
532 533
}

534
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
S
Shengliang Guan 已提交
535
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
536
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
537 538 539
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
540
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
541
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
S
Shengliang Guan 已提交
542
  pFsm->FpLeaderTransferCb = NULL;
543
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
544
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
545 546
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
S
Shengliang Guan 已提交
547
  pFsm->FpReConfigCb = NULL;
S
Shengliang Guan 已提交
548 549 550 551 552 553 554
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;

M
Minghao Li 已提交
555
  return pFsm;
556 557 558 559
}

int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
560
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
M
Minghao Li 已提交
561
      .batchSize = 1,
562 563 564
      .vgId = pVnode->config.vgId,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
S
Shengliang Guan 已提交
565
      .msgcb = &pVnode->msgCb,
S
Shengliang Guan 已提交
566 567 568 569 570 571
      .syncSendMSg = vnodeSyncSendMsg,
      .syncEqMsg = vnodeSyncEqMsg,
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
      .pingMs = 5000,
      .electMs = 4000,
      .heartbeatMs = 700,
572 573 574 575 576
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);

S
Shengliang Guan 已提交
577 578 579 580 581 582 583
  SSyncCfg *pCfg = &syncInfo.syncCfg;
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
    vInfo("vgId:%d, index:%d ep:%s:%u", pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort);
  }

584 585 586 587 588 589 590 591 592
  pVnode->sync = syncOpen(&syncInfo);
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }

  return 0;
}

B
Benguang Zhao 已提交
593
int32_t vnodeSyncStart(SVnode *pVnode) {
S
Shengliang Guan 已提交
594
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
B
Benguang Zhao 已提交
595 596 597 598 599
  if (syncStart(pVnode->sync) < 0) {
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr());
    return -1;
  }
  return 0;
600 601
}

S
Shengliang Guan 已提交
602 603 604 605
void vnodeSyncPreClose(SVnode *pVnode) {
  vInfo("vgId:%d, pre close sync", pVnode->config.vgId);
  syncLeaderTransfer(pVnode->sync);
  syncPreStop(pVnode->sync);
606

S
Shengliang Guan 已提交
607 608 609 610 611 612 613 614 615
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
    pVnode->blocked = false;
    tsem_post(&pVnode->syncSem);
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

S
Shengliang Guan 已提交
616
void vnodeSyncClose(SVnode *pVnode) {
S
Shengliang Guan 已提交
617
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
S
Shengliang Guan 已提交
618 619
  syncStop(pVnode->sync);
}
620

S
Shengliang Guan 已提交
621 622 623 624 625 626 627 628 629
void vnodeSyncCheckTimeout(SVnode *pVnode) {
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
  taosThreadMutexLock(&pVnode->lock);
  if (pVnode->blocked) {
    int32_t curSec = taosGetTimestampSec();
    int32_t delta = curSec - pVnode->blockSec;
    if (delta > VNODE_TIMEOUT_SEC) {
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
630
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
631
#if 0  
632
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
633
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
634 635
              rpcMsg.info.handle, rpcMsg.info.ahandle);
        rpcSendResponse(&rpcMsg);
636
#endif
637
      }
S
Shengliang Guan 已提交
638 639 640 641 642 643 644 645 646
      pVnode->blocked = false;
      pVnode->blockSec = 0;
      pVnode->blockSeq = 0;
      tsem_post(&pVnode->syncSem);
    }
  }
  taosThreadMutexUnlock(&pVnode->lock);
}

647 648 649 650
bool vnodeIsRoleLeader(SVnode *pVnode) {
  SSyncState state = syncGetState(pVnode->sync);
  return state.state == TAOS_SYNC_STATE_LEADER;
}
651

652
bool vnodeIsLeader(SVnode *pVnode) {
653
  terrno = 0;
654 655
  SSyncState state = syncGetState(pVnode->sync);

656 657 658 659 660 661 662 663
  if (terrno != 0) {
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
    return false;
  }

  if (state.state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
664 665 666
    return false;
  }

667 668 669
  if (!state.restored || !pVnode->restored) {
    terrno = TSDB_CODE_SYN_RESTORING;
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
670 671
    return false;
  }
672 673

  return true;
L
Liu Jicong 已提交
674
}