提交 a56bec81 编写于 作者: 翊名 提交者: dinglei

feat(PullConsumer) support begin/end seek support for pull consumer

上级 c3b7d186
...@@ -18,7 +18,6 @@ package org.apache.rocketmq.client.consumer; ...@@ -18,7 +18,6 @@ package org.apache.rocketmq.client.consumer;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.OffsetStore;
...@@ -35,12 +34,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -35,12 +34,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl; private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
/** /**
* Consumers belonging to the same consumer group share a group id. The consumers in a group then * Consumers belonging to the same consumer group share a group id. The consumers in a group then divides the topic
* divides the topic as fairly amongst themselves as possible by establishing that each queue is only * as fairly amongst themselves as possible by establishing that each queue is only consumed by a single consumer
* consumed by a single consumer from the group. If all consumers are from the same group, it functions * from the group. If all consumers are from the same group, it functions as a traditional message queue. Each
* as a traditional message queue. Each message would be consumed by one consumer of the group only. * message would be consumed by one consumer of the group only. When multiple consumer groups exist, the flow of the
* When multiple consumer groups exist, the flow of the data consumption model aligns with the traditional * data consumption model aligns with the traditional publish-subscribe model. The messages are broadcast to all
* publish-subscribe model. The messages are broadcast to all consumer groups. * consumer groups.
*/ */
private String consumerGroup; private String consumerGroup;
...@@ -266,6 +265,21 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -266,6 +265,21 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
return this.defaultLitePullConsumerImpl.committed(messageQueue); return this.defaultLitePullConsumerImpl.committed(messageQueue);
} }
@Override
public void updateNameServerAddress(String nameServerAddress) {
this.defaultLitePullConsumerImpl.updateNameServerAddr(nameServerAddress);
}
@Override
public void seekToBegin(MessageQueue messageQueue) throws MQClientException {
this.defaultLitePullConsumerImpl.seekToBegin(messageQueue);
}
@Override
public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
this.defaultLitePullConsumerImpl.seekToEnd(messageQueue);
}
@Override @Override
public boolean isAutoCommit() { public boolean isAutoCommit() {
return autoCommit; return autoCommit;
......
...@@ -172,4 +172,27 @@ public interface LitePullConsumer { ...@@ -172,4 +172,27 @@ public interface LitePullConsumer {
*/ */
void registerTopicMessageQueueChangeListener(String topic, void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException; TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;
/**
* Update name server addresses.
*/
void updateNameServerAddress(String nameServerAddress);
/**
* Overrides the fetch offsets with the begin offset that the consumer will use on the next poll. If this API is
* invoked for the same message queue more than once, the latest offset will be used on the next poll(). Note that
* you may lose data if this API is arbitrarily used in the middle of consumption.
*
* @param messageQueue
*/
void seekToBegin(MessageQueue messageQueue)throws MQClientException;
/**
* Overrides the fetch offsets with the end offset that the consumer will use on the next poll. If this API is
* invoked for the same message queue more than once, the latest offset will be used on the next poll(). Note that
* you may lose data if this API is arbitrarily used in the middle of consumption.
*
* @param messageQueue
*/
void seekToEnd(MessageQueue messageQueue)throws MQClientException;
} }
...@@ -16,16 +16,16 @@ ...@@ -16,16 +16,16 @@
*/ */
package org.apache.rocketmq.client.impl.consumer; package org.apache.rocketmq.client.impl.consumer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.HashSet;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
...@@ -35,13 +35,12 @@ import java.util.concurrent.ScheduledExecutorService; ...@@ -35,13 +35,12 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore; import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
...@@ -164,6 +163,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -164,6 +163,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
} }
public void updateNameServerAddr(String newAddresses) {
this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses);
}
private synchronized void setSubscriptionType(SubscriptionType type) { private synchronized void setSubscriptionType(SubscriptionType type) {
if (this.subscriptionType == SubscriptionType.NONE) if (this.subscriptionType == SubscriptionType.NONE)
this.subscriptionType = type; this.subscriptionType = type;
...@@ -556,6 +559,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -556,6 +559,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
public void seekToBegin(MessageQueue messageQueue) throws MQClientException {
long begin = minOffset(messageQueue);
this.seek(messageQueue, begin);
}
public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
long begin = maxOffset(messageQueue);
this.seek(messageQueue, begin);
}
private long maxOffset(MessageQueue messageQueue) throws MQClientException { private long maxOffset(MessageQueue messageQueue) throws MQClientException {
checkServiceState(); checkServiceState();
return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue); return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue);
...@@ -764,8 +777,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -764,8 +777,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(), subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
topic, SubscriptionData.SUB_ALL); topic, SubscriptionData.SUB_ALL);
} }
PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize()); PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize());
switch (pullResult.getPullStatus()) { switch (pullResult.getPullStatus()) {
case FOUND: case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue); final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册