diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index dcc3c86171bde9a5d7581f8d7238f5c831ad88ef..66ad0714bdbf216700ab467ac686d8916cb12e43 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -198,6 +198,8 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit); void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); +int64_t qStreamExtractOffsetUid(qTaskInfo_t tinfo); + SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8082e74d346ed39f478198e682b4ee1c8b695ee8..653eb2b9c4cebdf8b579d2abfac795f2f5950c83 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -486,9 +486,9 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe } tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 - ",version:%" PRId64, + ",ts:%" PRId64, pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, - taosxRsp.rspOffset.version); + taosxRsp.rspOffset.ts); if (taosxRsp.blockNum > 0) { code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); tDeleteSTaosxRsp(&taosxRsp); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index b5434e0b0148682b3116fa9f03eaf756a3f85850..d8e35f966f91b9545fe439ea14c6973e6f7ac313 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -157,6 +157,9 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta } if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (qStreamExtractOffsetUid(task) != 0) { + continue; + } tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); break; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fa576329a6710d8003d7fd9f0438cc507bd07ada..02cda670f03fc8fba202c44eeffa7ffaeadb3493 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -993,6 +993,11 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { return &pTaskInfo->streamInfo.metaRsp; } +int64_t qStreamExtractOffsetUid(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + return pTaskInfo->streamInfo.currentOffset.uid; +} + void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal));