未验证 提交 dc815cc5 编写于 作者: H Heng Du 提交者: GitHub

[ISSUE #1912]Polish the committed offset logic for the lite pull consumer.

[ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
上级 7760bb4a
...@@ -279,7 +279,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -279,7 +279,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override @Override
public Long committed(MessageQueue messageQueue) throws MQClientException { public Long committed(MessageQueue messageQueue) throws MQClientException {
return this.defaultLitePullConsumerImpl.committed(messageQueue); return this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue));
} }
@Override @Override
...@@ -289,12 +289,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -289,12 +289,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override @Override
public void seekToBegin(MessageQueue messageQueue) throws MQClientException { public void seekToBegin(MessageQueue messageQueue) throws MQClientException {
this.defaultLitePullConsumerImpl.seekToBegin(messageQueue); this.defaultLitePullConsumerImpl.seekToBegin(queueWithNamespace(messageQueue));
} }
@Override @Override
public void seekToEnd(MessageQueue messageQueue) throws MQClientException { public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
this.defaultLitePullConsumerImpl.seekToEnd(messageQueue); this.defaultLitePullConsumerImpl.seekToEnd(queueWithNamespace(messageQueue));
} }
@Override @Override
......
...@@ -631,7 +631,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -631,7 +631,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public long committed(MessageQueue messageQueue) throws MQClientException { public long committed(MessageQueue messageQueue) throws MQClientException {
checkServiceState(); checkServiceState();
long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE); long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (offset == -2) if (offset == -2)
throw new MQClientException("Fetch consume offset from broker exception", null); throw new MQClientException("Fetch consume offset from broker exception", null);
return offset; return offset;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册