From dc815cc581f9e9dd36efa926fc7087ff1ecbf1a6 Mon Sep 17 00:00:00 2001 From: Heng Du Date: Fri, 3 Apr 2020 16:30:39 +0800 Subject: [PATCH] [ISSUE #1912]Polish the committed offset logic for the lite pull consumer. [ISSUE #1912]Polish the committed offset logic for the lite pull consumer. --- .../rocketmq/client/consumer/DefaultLitePullConsumer.java | 6 +++--- .../client/impl/consumer/DefaultLitePullConsumerImpl.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index ef76cfd4..6718eb55 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -279,7 +279,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public Long committed(MessageQueue messageQueue) throws MQClientException { - return this.defaultLitePullConsumerImpl.committed(messageQueue); + return this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue)); } @Override @@ -289,12 +289,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void seekToBegin(MessageQueue messageQueue) throws MQClientException { - this.defaultLitePullConsumerImpl.seekToBegin(messageQueue); + this.defaultLitePullConsumerImpl.seekToBegin(queueWithNamespace(messageQueue)); } @Override public void seekToEnd(MessageQueue messageQueue) throws MQClientException { - this.defaultLitePullConsumerImpl.seekToEnd(messageQueue); + this.defaultLitePullConsumerImpl.seekToEnd(queueWithNamespace(messageQueue)); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 8ad7a6b4..f54078fc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -631,7 +631,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public long committed(MessageQueue messageQueue) throws MQClientException { checkServiceState(); - long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE); + long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE); if (offset == -2) throw new MQClientException("Fetch consume offset from broker exception", null); return offset; -- GitLab