diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index a1ae1e429dd5dbb18f6521b263576e7096482327..bef7301a07720ff5ecb53249d7fbcf3013783027 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -201,6 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); +SWalRef *walRefFirstVer(SWal *); SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1d5fae33eb8049633c8c4b568de98da68ead3d68..7649e8a00613474a9698fa5fa2d3478bdbef9bd9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -521,7 +521,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqOffsetResetToData(&fetchOffsetNew, 0, 0); } } else { - tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); + int64_t firstVer = walGetFirstVer(pTq->pVnode->pWal); + walRefVer(pHandle->pRef, firstVer); + tqOffsetResetToLog(&fetchOffsetNew, firstVer - 1); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index e86111109ce4fc9768be8b662670ae711f935e39..f5cfe9abaee4e1e964189c6058afcdd4d8b28b73 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -77,6 +77,29 @@ void walUnrefVer(SWalRef *pRef) { } #endif +SWalRef *walRefFirstVer(SWal *pWal) { + SWalRef *pRef = walOpenRef(pWal); + if (pRef == NULL) { + return NULL; + } + taosThreadMutexLock(&pWal->mutex); + + int64_t ver = walGetFirstVer(pWal); + + wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver); + + pRef->refVer = ver; + // bsearch in fileSet + SWalFileInfo tmpInfo; + tmpInfo.firstVer = ver; + SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + ASSERT(pRet != NULL); + pRef->refFile = pRet->firstVer; + + taosThreadMutexUnlock(&pWal->mutex); + return pRef; +} + SWalRef *walRefCommittedVer(SWal *pWal) { SWalRef *pRef = walOpenRef(pWal); if (pRef == NULL) { @@ -86,6 +109,8 @@ SWalRef *walRefCommittedVer(SWal *pWal) { int64_t ver = walGetCommittedVer(pWal); + wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver); + pRef->refVer = ver; // bsearch in fileSet SWalFileInfo tmpInfo;