提交 163f5abc 编写于 作者: T t_max

fix: no data after seek

上级 70699dde
...@@ -1868,7 +1868,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1868,7 +1868,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// update the local offset value only for the returned values, only when the local offset is NOT updated // update the local offset value only for the returned values, only when the local offset is NOT updated
// by tmq_offset_seek function // by tmq_offset_seek function
if (!pVg->seekUpdated) { if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
} }
// update the status // update the status
...@@ -1952,8 +1955,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1952,8 +1955,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate // update the local offset value only for the returned values, only when the local offset is NOT updated
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; // by tmq_offset_seek function
if (!pVg->seekUpdated) {
if(pollRspWrapper->taosxRsp.rspOffset.type != 0) { // if offset is validate
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
}
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
} }
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册