diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 66ad0714bdbf216700ab467ac686d8916cb12e43..dcc3c86171bde9a5d7581f8d7238f5c831ad88ef 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -198,8 +198,6 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit); void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); -int64_t qStreamExtractOffsetUid(qTaskInfo_t tinfo); - SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index d8e35f966f91b9545fe439ea14c6973e6f7ac313..19c8f8af3ee955244e3cd1f50d978b50e0bccdd1 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -156,36 +156,33 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta } } - if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - if (qStreamExtractOffsetUid(task) != 0) { + // get meta + SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); + if (tmp->metaRspLen > 0) { + qStreamExtractOffset(task, &tmp->rspOffset); + *pMetaRsp = *tmp; + + tqDebug("tmqsnap task get data"); + break; + } + + if (pDataBlock == NULL) { + qStreamExtractOffset(task, pOffset); + if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { continue; } - tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), - pHandle->snapshotVer + 1); + tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); + qStreamExtractOffset(task, &pRsp->rspOffset); break; } if (pRsp->blockNum > 0) { tqDebug("tmqsnap task exec exited, get data"); + qStreamExtractOffset(task, &pRsp->rspOffset); break; } - - SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); - if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { - *pOffset = tmp->rspOffset; - qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); - tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META; - tqDebug("tmqsnap task exec change to get data"); - continue; - } - - *pMetaRsp = *tmp; - tqDebug("tmqsnap task exec exited, get meta"); - break; } - qStreamExtractOffset(task, &pRsp->rspOffset); - return 0; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 02cda670f03fc8fba202c44eeffa7ffaeadb3493..fa576329a6710d8003d7fd9f0438cc507bd07ada 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -993,11 +993,6 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { return &pTaskInfo->streamInfo.metaRsp; } -int64_t qStreamExtractOffsetUid(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return pTaskInfo->streamInfo.currentOffset.uid; -} - void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal)); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e4e7f32c366714fb9b91b1eb396cd7174456d2f9..45ee6e9ea18b471bec4753f45949ae102f4c31fd 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2087,17 +2087,16 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { } SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext); + STqOffsetVal offset = {0}; if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal qDebug("tmqsnap read snapshot done, change to get data from wal"); - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->sContext->snapVersion); + tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion); } else { - STqOffsetVal offset = {0}; tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN); - qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid); } + qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); tDeleteSSchemaWrapper(mtInfo.schema); - qDebug("tmqsnap stream scan tsdb return null"); return NULL; } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) { SSnapContext* sContext = pInfo->sContext; @@ -2112,10 +2111,11 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { } if (!sContext->queryMeta) { // change to get data next poll request - tqOffsetResetToData(&pTaskInfo->streamInfo.metaRsp.rspOffset, 0, INT64_MIN); + STqOffsetVal offset = {0}; + tqOffsetResetToData(&offset, 0, INT64_MIN); + qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); } else { tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid); - pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.currentOffset; pTaskInfo->streamInfo.metaRsp.resMsgType = type; pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen; pTaskInfo->streamInfo.metaRsp.metaRsp = data;