From a56bec8163dd16aa32c498b3b6a7100c612d91c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=8A=E5=90=8D?= Date: Mon, 25 Nov 2019 16:14:31 +0800 Subject: [PATCH] feat(PullConsumer) support begin/end seek support for pull consumer --- .../consumer/DefaultLitePullConsumer.java | 28 ++++++++++++---- .../client/consumer/LitePullConsumer.java | 23 +++++++++++++ .../consumer/DefaultLitePullConsumerImpl.java | 32 +++++++++++++------ 3 files changed, 67 insertions(+), 16 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 99976d55..09f7e204 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.client.consumer; import java.util.Collection; import java.util.List; - import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.consumer.store.OffsetStore; @@ -35,12 +34,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl; /** - * Consumers belonging to the same consumer group share a group id. The consumers in a group then - * divides the topic as fairly amongst themselves as possible by establishing that each queue is only - * consumed by a single consumer from the group. If all consumers are from the same group, it functions - * as a traditional message queue. Each message would be consumed by one consumer of the group only. - * When multiple consumer groups exist, the flow of the data consumption model aligns with the traditional - * publish-subscribe model. The messages are broadcast to all consumer groups. + * Consumers belonging to the same consumer group share a group id. The consumers in a group then divides the topic + * as fairly amongst themselves as possible by establishing that each queue is only consumed by a single consumer + * from the group. If all consumers are from the same group, it functions as a traditional message queue. Each + * message would be consumed by one consumer of the group only. When multiple consumer groups exist, the flow of the + * data consumption model aligns with the traditional publish-subscribe model. The messages are broadcast to all + * consumer groups. */ private String consumerGroup; @@ -266,6 +265,21 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon return this.defaultLitePullConsumerImpl.committed(messageQueue); } + @Override + public void updateNameServerAddress(String nameServerAddress) { + this.defaultLitePullConsumerImpl.updateNameServerAddr(nameServerAddress); + } + + @Override + public void seekToBegin(MessageQueue messageQueue) throws MQClientException { + this.defaultLitePullConsumerImpl.seekToBegin(messageQueue); + } + + @Override + public void seekToEnd(MessageQueue messageQueue) throws MQClientException { + this.defaultLitePullConsumerImpl.seekToEnd(messageQueue); + } + @Override public boolean isAutoCommit() { return autoCommit; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java index d6e657ff..ce222880 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java @@ -172,4 +172,27 @@ public interface LitePullConsumer { */ void registerTopicMessageQueueChangeListener(String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException; + + /** + * Update name server addresses. + */ + void updateNameServerAddress(String nameServerAddress); + + /** + * Overrides the fetch offsets with the begin offset that the consumer will use on the next poll. If this API is + * invoked for the same message queue more than once, the latest offset will be used on the next poll(). Note that + * you may lose data if this API is arbitrarily used in the middle of consumption. + * + * @param messageQueue + */ + void seekToBegin(MessageQueue messageQueue)throws MQClientException; + + /** + * Overrides the fetch offsets with the end offset that the consumer will use on the next poll. If this API is + * invoked for the same message queue more than once, the latest offset will be used on the next poll(). Note that + * you may lose data if this API is arbitrarily used in the middle of consumption. + * + * @param messageQueue + */ + void seekToEnd(MessageQueue messageQueue)throws MQClientException; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 2c673a1b..f44eea7f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -16,16 +16,16 @@ */ package org.apache.rocketmq.client.impl.consumer; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.List; -import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.Collection; -import java.util.HashSet; import java.util.Properties; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -35,13 +35,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; - import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; -import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener; -import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener; import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; @@ -164,6 +163,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); } + public void updateNameServerAddr(String newAddresses) { + this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses); + } + private synchronized void setSubscriptionType(SubscriptionType type) { if (this.subscriptionType == SubscriptionType.NONE) this.subscriptionType = type; @@ -556,6 +559,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } + public void seekToBegin(MessageQueue messageQueue) throws MQClientException { + long begin = minOffset(messageQueue); + this.seek(messageQueue, begin); + } + + public void seekToEnd(MessageQueue messageQueue) throws MQClientException { + long begin = maxOffset(messageQueue); + this.seek(messageQueue, begin); + } + private long maxOffset(MessageQueue messageQueue) throws MQClientException { checkServiceState(); return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue); @@ -764,8 +777,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(), topic, SubscriptionData.SUB_ALL); } - PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize()); + + switch (pullResult.getPullStatus()) { case FOUND: final Object objLock = messageQueueLock.fetchLockObject(messageQueue); -- GitLab