diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index ef76cfd4d5195569d64b36cf2e6065e2758fb17a..6718eb55fd8730ac9ff0187f4f0b54fc48b932fa 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -279,7 +279,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public Long committed(MessageQueue messageQueue) throws MQClientException { - return this.defaultLitePullConsumerImpl.committed(messageQueue); + return this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue)); } @Override @@ -289,12 +289,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void seekToBegin(MessageQueue messageQueue) throws MQClientException { - this.defaultLitePullConsumerImpl.seekToBegin(messageQueue); + this.defaultLitePullConsumerImpl.seekToBegin(queueWithNamespace(messageQueue)); } @Override public void seekToEnd(MessageQueue messageQueue) throws MQClientException { - this.defaultLitePullConsumerImpl.seekToEnd(messageQueue); + this.defaultLitePullConsumerImpl.seekToEnd(queueWithNamespace(messageQueue)); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 8ad7a6b44b7fe697ad937339e622595a061cd7fe..f54078fcfdc687a650444e147d5cbb59779b6fe9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -631,7 +631,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { public long committed(MessageQueue messageQueue) throws MQClientException { checkServiceState(); - long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE); + long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE); if (offset == -2) throw new MQClientException("Fetch consume offset from broker exception", null); return offset;