From 163f5abc4ce1841a6b5f0dcde8a9505c22178af0 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Fri, 2 Jun 2023 17:22:49 +0800 Subject: [PATCH] fix: no data after seek --- source/client/src/clientTmq.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ccb09e3584..e1b2b9c48b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1868,7 +1868,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // update the local offset value only for the returned values, only when the local offset is NOT updated // by tmq_offset_seek function if (!pVg->seekUpdated) { + tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId); pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; + } else { + tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId); } // update the status @@ -1952,8 +1955,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } - if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate - pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; + // update the local offset value only for the returned values, only when the local offset is NOT updated + // by tmq_offset_seek function + if (!pVg->seekUpdated) { + if(pollRspWrapper->taosxRsp.rspOffset.type != 0) { // if offset is validate + tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId); + pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; + } + } else { + tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId); } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); -- GitLab