From ee32620808441e640ac22b5d65769a8323553754 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 16:22:30 +0800 Subject: [PATCH] fix:[TS-3347]set ver to first version if version stored is smaller than first version in wal when subscribe db --- include/libs/executor/executor.h | 2 ++ source/dnode/vnode/src/tq/tqUtil.c | 1 + source/libs/executor/src/executor.c | 15 +++++++++------ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 1fb00e743f..b7e6c42e3b 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); +void verifyOffset(void *pWalReader, STqOffsetVal* pOffset); + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); void qStreamSetOpen(qTaskInfo_t tinfo); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 133c51a8dc..dba363122a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -246,6 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (offset->type == TMQ_OFFSET__LOG) { + verifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5fc079b7c1..1c87619e84 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { pOperator->status = OP_NOT_OPENED; } +void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){ + // if offset version is small than first version , let's seek to first version + int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal); + if (pOffset->version + 1 < firstVer){ + pOffset->version = firstVer - 1; + } +} + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; @@ -1083,12 +1091,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tsdbReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; - // let's seek to the next version in wal file - int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal); - if (pOffset->version + 1 < firstVer){ - pOffset->version = firstVer - 1; - } - + verifyOffset(pInfo->tqReader->pWalReader, pOffset); if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); return -1; -- GitLab