diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index cff4b0234c2c37c8dac82b174a916a926bdfd564..a95293a5b16b6c24368da179f102e131ff28e500 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -36,6 +36,7 @@ typedef struct SReadHandle { void* vnode; void* mnd; SMsgCb* pMsgCb; + int64_t version; bool initMetaReader; bool initTableReader; bool initTqReader; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index f58bbe0d688c6afb3f0f76e51d0ada940d919e84..8abaac6dffb15a656d78cd4454001ae8d28b8277 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -89,6 +89,8 @@ typedef struct { STqExecTb execTb; STqExecDb execDb; }; + // TODO remove it + int64_t tsdbEndVer; } STqExecHandle; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e0afc6c80fa43b80f35dc72f8f79b4ba3e6f92dc..ae0f7f56a2cccff84580453fc06e64e5650f73c6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -483,6 +483,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { /*for (int32_t i = 0; i < 5; i++) {*/ /*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/ /*}*/ + int64_t ver = walGetCommittedVer(pTq->pVnode->pWal); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { pHandle->execHandle.execCol.qmsg = req.qmsg; req.qmsg = NULL; @@ -493,6 +494,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .vnode = pTq->pVnode, .initTableReader = true, .initTqReader = true, + .version = ver, }; pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); ASSERT(pHandle->execHandle.execCol.task[i]); @@ -501,6 +503,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ASSERT(scanner); pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); ASSERT(pHandle->execHandle.pExecReader[i]); + pHandle->execHandle.tsdbEndVer = ver; } } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { for (int32_t i = 0; i < 5; i++) { diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 49cf42b083b73c57aac671bbf72f63a51dc6acd8..3ee274ced1733a6604d5d8720cabc40604d4d98e 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -96,6 +96,12 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset } } + if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + tqOffsetResetToLog(pOffset, pExec->tsdbEndVer + 1); + qStreamPrepareScan(task, pOffset); + continue; + } + void* meta = qStreamExtractMetaMsg(task); if (meta != NULL) { // tq add meta to rsp @@ -107,7 +113,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset ASSERT(pRsp->rspOffset.type != 0); - if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { + if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e42510ceba262055e77a17448d44d401ff27ddf8..f7155b2431aec345d6ef8c78d9e7e0e75798be6a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1493,6 +1493,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle) { SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info; + if (pHandle->version > 0) { + pSTInfo->cond.endVersion = pHandle->version; + } SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); if (pHandle->initTableReader) {