diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8e3f1aa82ae138864d0f0f5dde3c057c85948719..610db7774149ba55011dd81f26261c297364fbed 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -453,15 +453,6 @@ static int32_t processSubColumn(STQ* pTq, STqHandle* pHandle, const SMqPollReq* dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); taosWUnLockLatch(&pTq->pushLock); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); - - // NOTE: this pHandle->consumerId may have been changed already. - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 - ", ts:%" PRId64", reqId:0x%"PRIx64, - pRequest->consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, - dataRsp.rspOffset.ts, pRequest->reqId); - - tDeleteSMqDataRsp(&dataRsp); return code; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index b002140a6c6e64771c3a7c43d7aaa3f51d8fe519..6d4a8a1c766d02e357d2ce6c856ecbd4630c6105 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -357,11 +357,10 @@ bool tqNextDataBlock2(STqReader* pReader) { return false; } - tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen, - pReader->msg2.ver, pReader->nextBlk); - int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < blockSz) { + tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen, + pReader->msg2.ver, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) return true; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ef0f3917093a66e3a086de8c97f38f80945a2ea1..31f022f368b661682897020630a73b081cf6877d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1628,7 +1628,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { while (1) { SFetchRet ret = {0}; tqNextBlock(pInfo->tqReader, &ret); - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion); + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); if (ret.fetchType == FETCH_TYPE__DATA) { qDebug("doQueueScan get data from log %d rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);