From 6fae38619d201cbaec882dbf07b80e62a0722b1b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 22 Mar 2023 19:36:12 +0800 Subject: [PATCH] fix:error in TD-23218 & remove useless logic --- source/dnode/vnode/src/tq/tq.c | 9 --------- source/dnode/vnode/src/tq/tqRead.c | 5 ++--- source/libs/executor/src/scanoperator.c | 2 +- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8e3f1aa82a..610db77741 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -453,15 +453,6 @@ static int32_t processSubColumn(STQ* pTq, STqHandle* pHandle, const SMqPollReq* dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); taosWUnLockLatch(&pTq->pushLock); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); - - // NOTE: this pHandle->consumerId may have been changed already. - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 - ", ts:%" PRId64", reqId:0x%"PRIx64, - pRequest->consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, - dataRsp.rspOffset.ts, pRequest->reqId); - - tDeleteSMqDataRsp(&dataRsp); return code; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index b002140a6c..6d4a8a1c76 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -357,11 +357,10 @@ bool tqNextDataBlock2(STqReader* pReader) { return false; } - tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen, - pReader->msg2.ver, pReader->nextBlk); - int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < blockSz) { + tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen, + pReader->msg2.ver, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) return true; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ef0f391709..31f022f368 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1628,7 +1628,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { while (1) { SFetchRet ret = {0}; tqNextBlock(pInfo->tqReader, &ret); - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion); + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); if (ret.fetchType == FETCH_TYPE__DATA) { qDebug("doQueueScan get data from log %d rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); -- GitLab