From 2b2e1c79c9dc5129ab13abab093791612d87efd5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 1 Aug 2022 19:40:38 +0800 Subject: [PATCH] fix: write snapshot after apply queue is empty --- source/dnode/vnode/src/vnd/vnodeSync.c | 33 +++++++++++++++++++++----- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 98e1716d9c..13b45f3164 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -30,6 +30,7 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { const STraceId *trace = &pMsg->info.traceId; vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + pVnode->blockCount = 1; tsem_wait(&pVnode->syncSem); } } @@ -37,8 +38,11 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - tsem_post(&pVnode->syncSem); + if (pVnode->blockCount) { + vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + pVnode->blockCount = 0; + tsem_post(&pVnode->syncSem); + } } } @@ -281,14 +285,15 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { for (int32_t i = 0; i < numOfMsgs; ++i) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; const STraceId *trace = &pMsg->info.traceId; - vGInfo("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%ld", vgId, pMsg, - TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex); + vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg, + TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex); SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; if (rsp.code == 0) { if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { rsp.code = terrno; - vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr()); + vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(), + pMsg->info.conn.applyIndex); } } @@ -297,7 +302,7 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { tmsgSendRsp(&rsp); } - vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code); + vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -611,6 +616,18 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; SSnapshotParam *pSnapshotParam = pParam; + + do { + int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); + if (itemSize == 0) { + vDebug("vgId:%d, apply queue is empty, start write snapshot", pVnode->config.vgId); + break; + } else { + vDebug("vgId:%d, %d items in apply queue, write snapshot later", pVnode->config.vgId); + taosMsleep(10); + } + } while (true); + int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter); return code; #else @@ -622,7 +639,10 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; + vDebug("vgId:%d, stop write snapshot, isApply:%d", pVnode->config.vgId, isApply); + int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); + vDebug("vgId:%d, apply snapshot to vnode, code:0x%x", pVnode->config.vgId, code); return code; #else taosMemoryFree(pWriter); @@ -634,6 +654,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void * #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; int32_t code = vnodeSnapWrite(pWriter, pBuf, len); + vTrace("vgId:%d, write snapshot, len:%d", pVnode->config.vgId, len); return code; #else return 0; -- GitLab