diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 8ae1c8720ca6a37ba46e942febc0433dd091d282..828341ddd8666bbb55c2cedce1b34c50f6b3362a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -121,6 +121,7 @@ typedef struct { struct STQ { SVnode* pVnode; char* path; + int64_t walLogLastVer; SRWLatch pushLock; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 052ab084536accd74084706cef6fd146efedbdfb..23c8e7e037ed0bd54bab4cbb3699aff3ac193510 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -201,6 +201,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqCheckLogInWal(STQ* pTq, int64_t version); SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4a941b3c2027003d0465910d884c3d1222ca58a3..045b497371093f443a446a43e8acb07559ea13d1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -80,6 +80,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { } pTq->path = strdup(path); pTq->pVnode = pVnode; + pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pHandle, destroySTqHandle); @@ -1536,3 +1537,5 @@ FAIL: taosFreeQitem(pMsg); return -1; } + +int32_t tqCheckLogInWal(STQ* pTq, int64_t version) { return version <= pTq->walLogLastVer; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0668a01e32c8efb6ad3f1656c35dfc96da00e19b..fe9aad4a206cad1d49198e8fd3732d9e2f419c0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -197,6 +197,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp if (!syncUtilUserCommit(pMsg->msgType)) goto _exit; + if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE) { + if (tqCheckLogInWal(pVnode->pTq, version)) return 0; + } + // skip header pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); len = pMsg->contLen - sizeof(SMsgHead);