From 0d56da7508baf110296ed7b88ff716dc35716e84 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 23 Dec 2022 19:04:39 +0800 Subject: [PATCH] fix: skip msg --- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 3 +++ source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++++ 4 files changed, 9 insertions(+) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 8ae1c8720c..828341ddd8 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -121,6 +121,7 @@ typedef struct { struct STQ { SVnode* pVnode; char* path; + int64_t walLogLastVer; SRWLatch pushLock; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 052ab08453..23c8e7e037 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -201,6 +201,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqCheckLogInWal(STQ* pTq, int64_t version); SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4a941b3c20..045b497371 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -80,6 +80,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { } pTq->path = strdup(path); pTq->pVnode = pVnode; + pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pHandle, destroySTqHandle); @@ -1536,3 +1537,5 @@ FAIL: taosFreeQitem(pMsg); return -1; } + +int32_t tqCheckLogInWal(STQ* pTq, int64_t version) { return version <= pTq->walLogLastVer; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0668a01e32..fe9aad4a20 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -197,6 +197,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp if (!syncUtilUserCommit(pMsg->msgType)) goto _exit; + if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE) { + if (tqCheckLogInWal(pVnode->pTq, version)) return 0; + } + // skip header pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); len = pMsg->contLen - sizeof(SMsgHead); -- GitLab