提交 cfca4401 编写于 作者: K King 提交者: Heng Du

Polish LitePullConsumer (#1395)

* fix unsubscribe code

* fix commit consumed offset

* fix commit consumed offset

* fix commit consumed offset

* fix commit consumed offset

* polish commit consumed offset

* pass checkstyle

* pass checkstyle

* polish LiteMQPullConsumer

* add flow control and polish commit logic

* fix bug

* polish code

* fix commit consumed offset back

* refactor litePullConsumer

* development save

* development save

* Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.

* Polish lite pull consumer

* polish lite pull consumer

* polish lite pull consumer

* fix seek

* fix seek function

* polish lite pull consumer

* add apache header

* add test

* polish test

* Make broadcast model work for litePullConsumer

* Revert example/broadcast/PushConsumer.java

* Add delay time when no new message

* Enable long polling mode

* Fix subscribe bug when rebalance

* Delete useless consumeMessageHook

* Implement TopicMessageQueueChangeListener interface

* Lite pull consumer support namespace

* Make sql92 filter work

* 1. Add javadoc for lite pull consumer.
2. Polish lite pul consumer parameters

* Modify lite pull consumer example

* Add apache header for LitePullConsumerAssign example.

* Add unit test for lite pull consumer

* Fix typo
上级 ddb07ab1
...@@ -77,28 +77,25 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -77,28 +77,25 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/ */
private boolean unitMode = false; private boolean unitMode = false;
private int maxReconsumeTimes = 16;
/** /**
* Maximum amount of time in minutes a message may block the consuming thread. * The flag for auto commit offset
*/ */
private long consumeTimeout = 15; private boolean autoCommit = true;
/** /**
* Is auto commit offset * Pull thread number
*/ */
private boolean autoCommit = true; private int pullThreadNums = 20;
private int pullThreadNumbers = 20;
/** /**
* Maximum commit offset interval time in seconds. * Maximum commit offset interval time in milliseconds.
*/ */
private long autoCommitInterval = 5; private long autoCommitIntervalMillis = 5 * 1000;
/** /**
* Maximum number of messages pulled each time. * Maximum number of messages pulled each time.
*/ */
private int pullBatchNums = 10; private int pullBatchSize = 10;
/** /**
* Flow control threshold for consume request, each consumer will cache at most 10000 consume requests by default. * Flow control threshold for consume request, each consumer will cache at most 10000 consume requests by default.
...@@ -132,22 +129,36 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -132,22 +129,36 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
private long pollTimeoutMillis = 1000 * 5; private long pollTimeoutMillis = 1000 * 5;
/** /**
* Message pull delay in milliseconds * Default constructor.
*/ */
private long pullDelayTimeMills = 0;
public DefaultLitePullConsumer() { public DefaultLitePullConsumer() {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null); this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
} }
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
*/
public DefaultLitePullConsumer(final String consumerGroup) { public DefaultLitePullConsumer(final String consumerGroup) {
this(null, consumerGroup, null); this(null, consumerGroup, null);
} }
/**
* Constructor specifying RPC hook.
*
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(RPCHook rpcHook) { public DefaultLitePullConsumer(RPCHook rpcHook) {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
} }
/**
* Constructor specifying consumer group, RPC hook
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) { public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
this(null, consumerGroup, rpcHook); this(null, consumerGroup, rpcHook);
} }
...@@ -164,121 +175,229 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -164,121 +175,229 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
} }
/**
* Start the consumer
*/
@Override @Override
public void start() throws MQClientException { public void start() throws MQClientException {
this.defaultLitePullConsumerImpl.start(); this.defaultLitePullConsumerImpl.start();
} }
/**
* Shutdown the consumer
*/
@Override @Override
public void shutdown() { public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown(); this.defaultLitePullConsumerImpl.shutdown();
} }
/**
* Subscribe some topic with subExpression
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override @Override
public void subscribe(String topic, String subExpression) throws MQClientException { public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression); this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
} }
/**
* Subscribe some topic with selector.
*
* @param messageSelector message selector({@link MessageSelector}), can be null.
* @throws MQClientException if there is any client error.
*/
@Override @Override
public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector); this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
} }
/**
* Unsubscribe consumption some topic
*
* @param topic Message topic that needs to be unsubscribe.
*/
@Override @Override
public void unsubscribe(String topic) { public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic)); this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
} }
/**
* Manually assign a list of message queues to this consumer. This interface does not allow for incremental
* assignment and will replace the previous assignment (if there is one).
*
* @param messageQueues Message queues that needs to be assigned.
*/
@Override @Override
public void assign(Collection<MessageQueue> messageQueues) { public void assign(Collection<MessageQueue> messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues)); defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
} }
/**
* Fetch data for the topics or partitions specified using assign API
*
* @return list of message, can be null.
*/
@Override @Override
public List<MessageExt> poll() { public List<MessageExt> poll() {
return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis()); return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
} }
/**
* Fetch data for the topics or partitions specified using assign API
*
* @param timeout The amount time, in milliseconds, spent waiting in poll if data is not available. Must not be
* negative
* @return list of message, can be null.
*/
@Override @Override
public List<MessageExt> poll(long timeout) { public List<MessageExt> poll(long timeout) {
return defaultLitePullConsumerImpl.poll(timeout); return defaultLitePullConsumerImpl.poll(timeout);
} }
/**
* Overrides the fetch offsets that the consumer will use on the next poll. If this API is invoked for the same
* message queue more than once, the latest offset will be used on the next poll(). Note that you may lose data if
* this API is arbitrarily used in the middle of consumption.
*
* @param messageQueue
* @param offset
*/
@Override @Override
public void seek(MessageQueue messageQueue, long offset) throws MQClientException { public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset); this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
} }
/**
* Suspend pulling from the requested message queues.
*
* Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop immediately until the
* messages of the requested message queues drain.
*
* Note that this method does not affect message queue subscription. In particular, it does not cause a group
* rebalance.
*
* @param messageQueues Message queues that needs to be paused.
*/
@Override @Override
public void pause(Collection<MessageQueue> messageQueues) { public void pause(Collection<MessageQueue> messageQueues) {
this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues)); this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
} }
/**
* Resume specified message queues which have been paused with {@link #pause(Collection)}.
*
* @param messageQueues Message queues that needs to be resumed.
*/
@Override @Override
public void resume(Collection<MessageQueue> messageQueues) { public void resume(Collection<MessageQueue> messageQueues) {
this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues)); this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
} }
/**
* Get metadata about the message queues for a given topic.
*
* @param topic The topic that need to get metadata.
* @return collection of message queues
* @throws MQClientException if there is any client error.
*/
@Override @Override
public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException { public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic)); return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
} }
/**
* Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
* queue.
*
* @param messageQueue Message queues that needs to get offset by timestamp.
* @param timestamp
* @return offset
* @throws MQClientException if there is any client error.
*/
@Override @Override
public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException { public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp); return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);
} }
/**
* Register a callback for sensing topic metadata changes.
*
* @param topic The topic that need to monitor.
* @param topicMessageQueueChangeListener Callback when topic metadata changes.
* @throws MQClientException if there is any client error.
*/
@Override
public void registerTopicMessageQueueChangeListener(String topic, public void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException { TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {
this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener); this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
} }
/**
* Manually commit consume offset.
*/
@Override @Override
public void commitSync() { public void commitSync() {
this.defaultLitePullConsumerImpl.commitSync(); this.defaultLitePullConsumerImpl.commitSync();
} }
/**
* Get the last committed offset for the given message queue.
*
* @param messageQueue
* @return offset, if offset equals -1 means no offset in broker.
* @throws MQClientException if there is any client error.
*/
@Override
public Long committed(MessageQueue messageQueue) throws MQClientException {
return this.defaultLitePullConsumerImpl.committed(messageQueue);
}
/**
* Whether to enable auto-commit consume offset.
*
* @return true if enable auto-commit, false if disable auto-commit.
*/
@Override @Override
public boolean isAutoCommit() { public boolean isAutoCommit() {
return autoCommit; return autoCommit;
} }
/**
* Set whether to enable auto-commit consume offset.
*
* @param autoCommit Whether to enable auto-commit.
*/
@Override @Override
public void setAutoCommit(boolean autoCommit) { public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit; this.autoCommit = autoCommit;
} }
public long getConsumeTimeout() { public int getPullThreadNums() {
return consumeTimeout; return pullThreadNums;
}
public void setConsumeTimeout(long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
} }
public int getPullThreadNumbers() { public void setPullThreadNums(int pullThreadNums) {
return pullThreadNumbers; this.pullThreadNums = pullThreadNums;
} }
public void setPullThreadNumbers(int pullThreadNumbers) { public long getAutoCommitIntervalMillis() {
this.pullThreadNumbers = pullThreadNumbers; return autoCommitIntervalMillis;
} }
public long getAutoCommitInterval() { public void setAutoCommitIntervalMillis(long autoCommitIntervalMillis) {
return autoCommitInterval; this.autoCommitIntervalMillis = autoCommitIntervalMillis;
}
public void setAutoCommitInterval(long autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
} }
public int getPullBatchNums() { public int getPullBatchNums() {
return pullBatchNums; return pullBatchSize;
} }
public void setPullBatchNums(int pullBatchNums) { public void setPullBatchNums(int pullBatchNums) {
this.pullBatchNums = pullBatchNums; this.pullBatchSize = pullBatchNums;
} }
public long getPullThresholdForAll() { public long getPullThresholdForAll() {
...@@ -349,14 +468,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -349,14 +468,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.unitMode = isUnitMode; this.unitMode = isUnitMode;
} }
public int getMaxReconsumeTimes() {
return maxReconsumeTimes;
}
public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes;
}
public MessageModel getMessageModel() { public MessageModel getMessageModel() {
return messageModel; return messageModel;
} }
...@@ -392,12 +503,4 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -392,12 +503,4 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) { public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend; this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
} }
public long getPullDelayTimeMills() {
return pullDelayTimeMills;
}
public void setPullDelayTimeMills(long pullDelayTimeMills) {
this.pullDelayTimeMills = pullDelayTimeMills;
}
} }
...@@ -39,7 +39,8 @@ public interface LitePullConsumer { ...@@ -39,7 +39,8 @@ public interface LitePullConsumer {
* Subscribe some topic with subExpression * Subscribe some topic with subExpression
* *
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all * null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/ */
void subscribe(final String topic, final String subExpression) throws MQClientException; void subscribe(final String topic, final String subExpression) throws MQClientException;
...@@ -47,37 +48,128 @@ public interface LitePullConsumer { ...@@ -47,37 +48,128 @@ public interface LitePullConsumer {
* Subscribe some topic with selector. * Subscribe some topic with selector.
* *
* @param selector message selector({@link MessageSelector}), can be null. * @param selector message selector({@link MessageSelector}), can be null.
* @throws MQClientException if there is any client error.
*/ */
void subscribe(final String topic, final MessageSelector selector) throws MQClientException; void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
/** /**
* Unsubscribe consumption some topic * Unsubscribe consumption some topic
* *
* @param topic message topic * @param topic Message topic that needs to be unsubscribe.
*/ */
void unsubscribe(final String topic); void unsubscribe(final String topic);
/**
* Manually assign a list of message queues to this consumer. This interface does not allow for incremental
* assignment and will replace the previous assignment (if there is one).
*
* @param messageQueues Message queues that needs to be assigned.
*/
void assign(Collection<MessageQueue> messageQueues); void assign(Collection<MessageQueue> messageQueues);
/**
* Fetch data for the topics or partitions specified using assign API
*
* @return list of message, can be null.
*/
List<MessageExt> poll(); List<MessageExt> poll();
/**
* Fetch data for the topics or partitions specified using assign API
*
* @param timeout The amount time, in milliseconds, spent waiting in poll if data is not available. Must not be
* negative
* @return list of message, can be null.
*/
List<MessageExt> poll(long timeout); List<MessageExt> poll(long timeout);
/**
* Overrides the fetch offsets that the consumer will use on the next poll. If this API is invoked for the same
* message queue more than once, the latest offset will be used on the next poll(). Note that you may lose data if
* this API is arbitrarily used in the middle of consumption.
*
* @param messageQueue
* @param offset
*/
void seek(MessageQueue messageQueue, long offset) throws MQClientException; void seek(MessageQueue messageQueue, long offset) throws MQClientException;
/**
* Suspend pulling from the requested message queues.
*
* Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop immediately until the
* messages of the requested message queues drain.
*
* Note that this method does not affect message queue subscription. In particular, it does not cause a group
* rebalance.
*
* @param messageQueues Message queues that needs to be paused.
*/
void pause(Collection<MessageQueue> messageQueues); void pause(Collection<MessageQueue> messageQueues);
/**
* Resume specified message queues which have been paused with {@link #pause(Collection)}.
*
* @param messageQueues Message queues that needs to be resumed.
*/
void resume(Collection<MessageQueue> messageQueues);
/**
* Whether to enable auto-commit consume offset.
*
* @return true if enable auto-commit, false if disable auto-commit.
*/
boolean isAutoCommit(); boolean isAutoCommit();
/**
* Set whether to enable auto-commit consume offset.
*
* @param autoCommit Whether to enable auto-commit.
*/
void setAutoCommit(boolean autoCommit); void setAutoCommit(boolean autoCommit);
void resume(Collection<MessageQueue> messageQueues); /**
* Get metadata about the message queues for a given topic.
*
* @param topic The topic that need to get metadata.
* @return collection of message queues
* @throws MQClientException if there is any client error.
*/
Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException; Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException;
/**
* Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
* queue.
*
* @param messageQueue Message queues that needs to get offset by timestamp.
* @param timestamp
* @return offset
* @throws MQClientException if there is any client error.
*/
Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException; Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException;
/**
* Manually commit consume offset.
*/
void commitSync(); void commitSync();
/**
* Get the last committed offset for the given message queue.
*
* @param messageQueue
* @return offset, if offset equals -1 means no offset in broker.
* @throws MQClientException if there is any client error.
*/
Long committed(MessageQueue messageQueue) throws MQClientException;
/**
* Register a callback for sensing topic metadata changes.
*
* @param topic The topic that need to monitor.
* @param topicMessageQueueChangeListener Callback when topic metadata changes, refer {@link
* TopicMessageQueueChangeListener}
* @throws MQClientException if there is any client error.
*/
void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;
} }
...@@ -26,12 +26,12 @@ import org.apache.rocketmq.common.message.MessageQueue; ...@@ -26,12 +26,12 @@ import org.apache.rocketmq.common.message.MessageQueue;
public class AssignedMessageQueue { public class AssignedMessageQueue {
private ConcurrentHashMap<MessageQueue, MessageQueueStat> assignedMessageQueueState; private ConcurrentHashMap<MessageQueue, MessageQueueState> assignedMessageQueueState;
private RebalanceImpl rebalanceImpl; private RebalanceImpl rebalanceImpl;
public AssignedMessageQueue() { public AssignedMessageQueue() {
assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueStat>(); assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueState>();
} }
public void setRebalanceImpl(RebalanceImpl rebalanceImpl) { public void setRebalanceImpl(RebalanceImpl rebalanceImpl) {
...@@ -43,99 +43,99 @@ public class AssignedMessageQueue { ...@@ -43,99 +43,99 @@ public class AssignedMessageQueue {
} }
public boolean isPaused(MessageQueue messageQueue) { public boolean isPaused(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) { if (messageQueueState != null) {
return messageQueueStat.isPaused(); return messageQueueState.isPaused();
} }
return true; return true;
} }
public void pause(Collection<MessageQueue> messageQueues) { public void pause(Collection<MessageQueue> messageQueues) {
for (MessageQueue messageQueue : messageQueues) { for (MessageQueue messageQueue : messageQueues) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) { if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueStat.getPausedLatch().reset(); messageQueueState.getPausedLatch().reset();
messageQueueStat.setPaused(true); messageQueueState.setPaused(true);
} }
} }
} }
public void resume(Collection<MessageQueue> messageQueueCollection) { public void resume(Collection<MessageQueue> messageQueueCollection) {
for (MessageQueue messageQueue : messageQueueCollection) { for (MessageQueue messageQueue : messageQueueCollection) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) { if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueStat.setPaused(false); messageQueueState.setPaused(false);
messageQueueStat.getPausedLatch().reset(); messageQueueState.getPausedLatch().reset();
} }
} }
} }
public ProcessQueue getProcessQueue(MessageQueue messageQueue) { public ProcessQueue getProcessQueue(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) { if (messageQueueState != null) {
return messageQueueStat.getProcessQueue(); return messageQueueState.getProcessQueue();
} }
return null; return null;
} }
public long getPullOffset(MessageQueue messageQueue) { public long getPullOffset(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) { if (messageQueueState != null) {
return messageQueueStat.getPullOffset(); return messageQueueState.getPullOffset();
} }
return -1; return -1;
} }
public void updatePullOffset(MessageQueue messageQueue, long offset) { public void updatePullOffset(MessageQueue messageQueue, long offset) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) { if (messageQueueState != null) {
messageQueueStat.setPullOffset(offset); messageQueueState.setPullOffset(offset);
} }
} }
public long getConusmerOffset(MessageQueue messageQueue) { public long getConusmerOffset(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) { if (messageQueueState != null) {
return messageQueueStat.getConsumeOffset(); return messageQueueState.getConsumeOffset();
} }
return -1; return -1;
} }
public void updateConsumeOffset(MessageQueue messageQueue, long offset) { public void updateConsumeOffset(MessageQueue messageQueue, long offset) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) { if (messageQueueState != null) {
messageQueueStat.setConsumeOffset(offset); messageQueueState.setConsumeOffset(offset);
} }
} }
public void setSeekOffset(MessageQueue messageQueue, long offset) { public void setSeekOffset(MessageQueue messageQueue, long offset) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) { if (messageQueueState != null) {
messageQueueStat.setSeekOffset(offset); messageQueueState.setSeekOffset(offset);
} }
} }
public long getSeekOffset(MessageQueue messageQueue) { public long getSeekOffset(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) { if (messageQueueState != null) {
return messageQueueStat.getSeekOffset(); return messageQueueState.getSeekOffset();
} }
return -1; return -1;
} }
public CountDownLatch2 getPausedLatch(MessageQueue messageQueue) { public CountDownLatch2 getPausedLatch(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) { if (messageQueueState != null) {
return messageQueueStat.getPausedLatch(); return messageQueueState.getPausedLatch();
} }
return null; return null;
} }
public void updateAssignedMessageQueue(String topic, Collection<MessageQueue> assigned) { public void updateAssignedMessageQueue(String topic, Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) { synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator(); Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueStat> next = it.next(); Map.Entry<MessageQueue, MessageQueueState> next = it.next();
if (next.getKey().getTopic().equals(topic)) { if (next.getKey().getTopic().equals(topic)) {
if (!assigned.contains(next.getKey())) { if (!assigned.contains(next.getKey())) {
next.getValue().getProcessQueue().setDropped(true); next.getValue().getProcessQueue().setDropped(true);
...@@ -149,9 +149,9 @@ public class AssignedMessageQueue { ...@@ -149,9 +149,9 @@ public class AssignedMessageQueue {
public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) { public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) { synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator(); Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueStat> next = it.next(); Map.Entry<MessageQueue, MessageQueueState> next = it.next();
if (!assigned.contains(next.getKey())) { if (!assigned.contains(next.getKey())) {
next.getValue().getProcessQueue().setDropped(true); next.getValue().getProcessQueue().setDropped(true);
it.remove(); it.remove();
...@@ -164,23 +164,23 @@ public class AssignedMessageQueue { ...@@ -164,23 +164,23 @@ public class AssignedMessageQueue {
private void addAssignedMessageQueue(Collection<MessageQueue> assigned) { private void addAssignedMessageQueue(Collection<MessageQueue> assigned) {
for (MessageQueue messageQueue : assigned) { for (MessageQueue messageQueue : assigned) {
if (!this.assignedMessageQueueState.containsKey(messageQueue)) { if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
MessageQueueStat messageQueueStat; MessageQueueState messageQueueState;
if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) { if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue)); messageQueueState = new MessageQueueState(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue));
} else { } else {
ProcessQueue processQueue = new ProcessQueue(); ProcessQueue processQueue = new ProcessQueue();
messageQueueStat = new MessageQueueStat(messageQueue, processQueue); messageQueueState = new MessageQueueState(messageQueue, processQueue);
} }
this.assignedMessageQueueState.put(messageQueue, messageQueueStat); this.assignedMessageQueueState.put(messageQueue, messageQueueState);
} }
} }
} }
public void removeAssignedMessageQueue(String topic) { public void removeAssignedMessageQueue(String topic) {
synchronized (this.assignedMessageQueueState) { synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator(); Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueStat> next = it.next(); Map.Entry<MessageQueue, MessageQueueState> next = it.next();
if (next.getKey().getTopic().equals(topic)) { if (next.getKey().getTopic().equals(topic)) {
it.remove(); it.remove();
} }
...@@ -188,7 +188,7 @@ public class AssignedMessageQueue { ...@@ -188,7 +188,7 @@ public class AssignedMessageQueue {
} }
} }
public class MessageQueueStat { private class MessageQueueState {
private MessageQueue messageQueue; private MessageQueue messageQueue;
private ProcessQueue processQueue; private ProcessQueue processQueue;
private volatile boolean paused = false; private volatile boolean paused = false;
...@@ -197,7 +197,7 @@ public class AssignedMessageQueue { ...@@ -197,7 +197,7 @@ public class AssignedMessageQueue {
private volatile long seekOffset = -1; private volatile long seekOffset = -1;
private CountDownLatch2 pausedLatch = new CountDownLatch2(1); private CountDownLatch2 pausedLatch = new CountDownLatch2(1);
public MessageQueueStat(MessageQueue messageQueue, ProcessQueue processQueue) { public MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) {
this.messageQueue = messageQueue; this.messageQueue = messageQueue;
this.processQueue = processQueue; this.processQueue = processQueue;
} }
......
...@@ -97,9 +97,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -97,9 +97,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
NONE, SUBSCRIBE, ASSIGN NONE, SUBSCRIBE, ASSIGN
} }
private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running."; private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Cannot select two subscription types at the same time."; private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
/** /**
* the type of subscription * the type of subscription
*/ */
...@@ -116,10 +116,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -116,10 +116,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
* Delay some time when suspend pull service * Delay some time when suspend pull service
*/ */
private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000; private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
/**
* Delay some time when no new message
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG = 0;
private DefaultLitePullConsumer defaultLitePullConsumer; private DefaultLitePullConsumer defaultLitePullConsumer;
...@@ -150,7 +146,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -150,7 +146,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
this.defaultLitePullConsumer = defaultLitePullConsumer; this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook; this.rpcHook = rpcHook;
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
this.defaultLitePullConsumer.getPullThreadNumbers(), this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup()) new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
); );
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
...@@ -219,11 +215,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -219,11 +215,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
case CREATE_JUST: case CREATE_JUST:
break; break;
case RUNNING: case RUNNING:
this.persistConsumerOffset(); persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultLitePullConsumer.getConsumerGroup()); this.mQClientFactory.unregisterConsumer(this.defaultLitePullConsumer.getConsumerGroup());
this.mQClientFactory.shutdown(); this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultLitePullConsumer.getConsumerGroup()); log.info("the consumer [{}] shutdown OK", this.defaultLitePullConsumer.getConsumerGroup());
scheduledThreadPoolExecutor.shutdown(); scheduledThreadPoolExecutor.shutdown();
scheduledExecutorService.shutdown();
this.serviceState = ServiceState.SHUTDOWN_ALREADY; this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break; break;
case SHUTDOWN_ALREADY: case SHUTDOWN_ALREADY:
...@@ -430,6 +427,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -430,6 +427,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, String subExpression) throws MQClientException { public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try { try {
if (topic == null || topic.equals("")) {
throw new IllegalArgumentException("Topic can not be null or empty.");
}
setSubscriptionType(SubscriptionType.SUBSCRIBE); setSubscriptionType(SubscriptionType.SUBSCRIBE);
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(), SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
topic, subExpression); topic, subExpression);
...@@ -447,6 +447,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -447,6 +447,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
try { try {
if (topic == null || topic.equals("")) {
throw new IllegalArgumentException("Topic can not be null or empty.");
}
setSubscriptionType(SubscriptionType.SUBSCRIBE); setSubscriptionType(SubscriptionType.SUBSCRIBE);
if (messageSelector == null) { if (messageSelector == null) {
subscribe(topic, SubscriptionData.SUB_ALL); subscribe(topic, SubscriptionData.SUB_ALL);
...@@ -487,7 +490,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -487,7 +490,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if (now >= nextAutoCommitDeadline) { if (now >= nextAutoCommitDeadline) {
commitAll(); commitAll();
nextAutoCommitDeadline = now + defaultLitePullConsumer.getAutoCommitInterval() * 1000; nextAutoCommitDeadline = now + defaultLitePullConsumer.getAutoCommitIntervalMillis();
} }
} }
...@@ -570,21 +573,21 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -570,21 +573,21 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
public long maxOffset(MessageQueue mq) throws MQClientException { private long maxOffset(MessageQueue messageQueue) throws MQClientException {
checkServiceState(); checkServiceState();
return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue);
} }
public long minOffset(MessageQueue mq) throws MQClientException { private long minOffset(MessageQueue messageQueue) throws MQClientException {
checkServiceState(); checkServiceState();
return this.mQClientFactory.getMQAdminImpl().minOffset(mq); return this.mQClientFactory.getMQAdminImpl().minOffset(messageQueue);
} }
public void removePullTaskCallback(final String topic) { private void removePullTaskCallback(final String topic) {
removePullTask(topic); removePullTask(topic);
} }
public void removePullTask(final String topic) { private void removePullTask(final String topic) {
Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator(); Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Map.Entry<MessageQueue, PullTaskImpl> next = it.next(); Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
...@@ -615,7 +618,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -615,7 +618,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
public synchronized void commitAll() { private synchronized void commitAll() {
try { try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue); long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue);
...@@ -635,9 +638,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -635,9 +638,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
private void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) { private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset) {
if (assignedMessageQueue.getSeekOffset(remoteQueue) == -1) { if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
assignedMessageQueue.updatePullOffset(remoteQueue, nextPullOffset); assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset);
} }
} }
...@@ -649,27 +652,35 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -649,27 +652,35 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
private long fetchConsumeOffset(MessageQueue mq, boolean fromStore) { private long fetchConsumeOffset(MessageQueue messageQueue, boolean fromStore) {
checkServiceState(); checkServiceState();
return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE); return this.offsetStore.readOffset(messageQueue, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
} }
private long nextPullOffset(MessageQueue remoteQueue) { public long committed(MessageQueue messageQueue) throws MQClientException {
checkServiceState();
long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
if (offset == -2)
throw new MQClientException("Fetch consume offset from broker exception", null);
return offset;
}
private long nextPullOffset(MessageQueue messageQueue) {
long offset = -1; long offset = -1;
long seekOffset = assignedMessageQueue.getSeekOffset(remoteQueue); long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue);
if (seekOffset != -1) { if (seekOffset != -1) {
offset = seekOffset; offset = seekOffset;
assignedMessageQueue.setSeekOffset(remoteQueue, -1); assignedMessageQueue.setSeekOffset(messageQueue, -1);
assignedMessageQueue.updatePullOffset(remoteQueue, offset); assignedMessageQueue.updatePullOffset(messageQueue, offset);
} else { } else {
offset = assignedMessageQueue.getPullOffset(remoteQueue); offset = assignedMessageQueue.getPullOffset(messageQueue);
if (offset == -1) { if (offset == -1) {
offset = fetchConsumeOffset(remoteQueue, false); offset = fetchConsumeOffset(messageQueue, false);
if (offset == -1 && defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { if (offset == -1 && defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offset = 0; offset = 0;
} }
assignedMessageQueue.updatePullOffset(remoteQueue, offset); assignedMessageQueue.updatePullOffset(messageQueue, offset);
assignedMessageQueue.updateConsumeOffset(remoteQueue, offset); assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
} }
} }
...@@ -751,7 +762,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -751,7 +762,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
long offset = nextPullOffset(messageQueue); long offset = nextPullOffset(messageQueue);
long pullDelayTimeMills = defaultLitePullConsumer.getPullDelayTimeMills(); long pullDelayTimeMills = 0;
try { try {
SubscriptionData subscriptionData; SubscriptionData subscriptionData;
...@@ -775,9 +786,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -775,9 +786,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
case OFFSET_ILLEGAL: case OFFSET_ILLEGAL:
log.warn("The pull request offset illegal, {}", pullResult.toString()); log.warn("The pull request offset illegal, {}", pullResult.toString());
break; break;
case NO_NEW_MSG:
pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG;
break;
default: default:
break; break;
} }
......
...@@ -33,6 +33,7 @@ import org.apache.rocketmq.client.impl.FindBrokerResult; ...@@ -33,6 +33,7 @@ import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQAdminImpl; import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper; import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.PullResultExt;
...@@ -44,7 +45,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; ...@@ -44,7 +45,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.junit.After; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
...@@ -56,6 +57,7 @@ import org.mockito.stubbing.Answer; ...@@ -56,6 +57,7 @@ import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
...@@ -77,7 +79,6 @@ public class DefaultLitePullConsumerTest { ...@@ -77,7 +79,6 @@ public class DefaultLitePullConsumerTest {
private RebalanceImpl rebalanceImpl; private RebalanceImpl rebalanceImpl;
private OffsetStore offsetStore; private OffsetStore offsetStore;
private DefaultLitePullConsumer litePullConsumer;
private DefaultLitePullConsumerImpl litePullConsumerImpl; private DefaultLitePullConsumerImpl litePullConsumerImpl;
private String consumerGroup = "LitePullConsumerGroup"; private String consumerGroup = "LitePullConsumerGroup";
private String topic = "LitePullConsumerTest"; private String topic = "LitePullConsumerTest";
...@@ -85,20 +86,150 @@ public class DefaultLitePullConsumerTest { ...@@ -85,20 +86,150 @@ public class DefaultLitePullConsumerTest {
@Before @Before
public void init() throws Exception { public void init() throws Exception {
String groupName = consumerGroup + System.currentTimeMillis();
litePullConsumer = new DefaultLitePullConsumer(groupName);
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
Field field = MQClientInstance.class.getDeclaredField("rebalanceService"); Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
field.setAccessible(true); field.setAccessible(true);
RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory); RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
field = RebalanceService.class.getDeclaredField("waitInterval"); field = RebalanceService.class.getDeclaredField("waitInterval");
field.setAccessible(true); field.setAccessible(true);
field.set(rebalanceService, 100); field.set(rebalanceService, 100);
}
litePullConsumer.start(); @Test
public void testAssign_PollMessageSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
try {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testSubscribe_PollMessageSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer = createSubscribeLitePullConsumer();
try {
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createMessageQueue());
litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
litePullConsumer.setPollTimeoutMillis(20 * 1000);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testSubscribe_BroadcastPollMessageSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer = createBroadcastLitePullConsumer();
try {
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createMessageQueue());
litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
litePullConsumer.setPollTimeoutMillis(20 * 1000);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testSubscriptionType_AssignAndSubscribeExclusive() throws Exception {
DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
try {
litePullConsumer.subscribe(topic, "*");
litePullConsumer.assign(Collections.singletonList(createMessageQueue()));
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
assertThat(e).hasMessageContaining("Subscribe and assign are mutually exclusive.");
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws Exception {
DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer();
try {
litePullConsumer.fetchMessageQueues(topic);
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
assertThat(e).hasMessageContaining("The consumer not running, please start it first.");
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testSeek_SeekOffsetIllegal() throws Exception {
DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
try {
litePullConsumer.seek(messageQueue, -1);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("min offset = 0");
}
try {
litePullConsumer.seek(messageQueue, 1000);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("max offset = 100");
}
litePullConsumer.shutdown();
}
field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); @Test
public void testSeek_SeekOffsetSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.seek(messageQueue, 50);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(litePullConsumerImpl);
assertEquals(assignedMessageQueue.getSeekOffset(messageQueue), 50);
assertEquals(assignedMessageQueue.getConusmerOffset(messageQueue), 50);
litePullConsumer.shutdown();
}
@Test
public void testSeek_MessageQueueNotInAssignList() throws Exception {
DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
try {
litePullConsumer.seek(createMessageQueue(), 0);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("The message queue is not in assigned list");
} finally {
litePullConsumer.shutdown();
}
}
private MessageQueue createMessageQueue() {
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
return messageQueue;
}
private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true); field.setAccessible(true);
litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer); litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory"); field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
...@@ -156,96 +287,35 @@ public class DefaultLitePullConsumerTest { ...@@ -156,96 +287,35 @@ public class DefaultLitePullConsumerTest {
doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class)); doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class));
} }
@After private DefaultLitePullConsumer createSubscribeLitePullConsumer() throws Exception {
public void terminate() { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.shutdown(); litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
}
@Test
public void testAssign_PollMessageSuccess() {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.setPullDelayTimeMills(60 * 1000);
litePullConsumer.assign(Collections.singletonList(messageQueue));
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
}
@Test
public void testSubscribe_PollMessageSuccess() throws MQClientException {
litePullConsumer.setPullDelayTimeMills(60 * 1000);
litePullConsumer.subscribe(topic, "*"); litePullConsumer.subscribe(topic, "*");
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createMessageQueue());
litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
litePullConsumer.setPollTimeoutMillis(20 * 1000);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
}
@Test litePullConsumer.start();
public void testSubscriptionType_AssignAndSubscribeExclusive() throws MQClientException { initDefaultLitePullConsumer(litePullConsumer);
try { return litePullConsumer;
litePullConsumer.subscribe(topic, "*");
litePullConsumer.assign(Collections.singletonList(createMessageQueue()));
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
assertThat(e).hasMessageContaining("Cannot select two subscription types at the same time.");
}
}
@Test
public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws MQClientException {
try {
DefaultLitePullConsumer litePullConsumer = createLitePullConsumer();
litePullConsumer.fetchMessageQueues(topic);
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
assertThat(e).hasMessageContaining("The consumer not running.");
}
}
@Test
public void testSeek_SeekOffsetIllegal() throws MQClientException {
when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
try {
litePullConsumer.seek(messageQueue, -1);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("min offset = 0");
}
try {
litePullConsumer.seek(messageQueue, 1000);
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("max offset = 100");
}
} }
@Test private DefaultLitePullConsumer createBroadcastLitePullConsumer() throws Exception {
public void testSeek_MessageQueueNotInAssignList() { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
try { litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.seek(createMessageQueue(), 0); litePullConsumer.setMessageModel(MessageModel.BROADCASTING);
failBecauseExceptionWasNotThrown(MQClientException.class); litePullConsumer.subscribe(topic, "*");
} catch (MQClientException e) { litePullConsumer.start();
assertThat(e).hasMessageContaining("The message queue is not in assigned list"); initDefaultLitePullConsumer(litePullConsumer);
} return litePullConsumer;
} }
private MessageQueue createMessageQueue() { private DefaultLitePullConsumer createStartLitePullConsumer() throws Exception {
MessageQueue messageQueue = new MessageQueue(); DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
messageQueue.setBrokerName(brokerName); litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
messageQueue.setQueueId(0); litePullConsumer.start();
messageQueue.setTopic(topic); initDefaultLitePullConsumer(litePullConsumer);
return messageQueue; return litePullConsumer;
} }
private DefaultLitePullConsumer createLitePullConsumer() { private DefaultLitePullConsumer createNotStartLitePullConsumer() {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
return litePullConsumer; return litePullConsumer;
} }
...@@ -258,4 +328,5 @@ public class DefaultLitePullConsumerTest { ...@@ -258,4 +328,5 @@ public class DefaultLitePullConsumerTest {
} }
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
} }
} }
...@@ -18,31 +18,35 @@ package org.apache.rocketmq.example.simple; ...@@ -18,31 +18,35 @@ package org.apache.rocketmq.example.simple;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
public class LitePullConsumer { public class LitePullConsumerAssign {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.setAutoCommit(false); litePullConsumer.setAutoCommit(false);
litePullConsumer.start(); litePullConsumer.start();
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest"); Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
List<MessageQueue> list = new ArrayList<>(mqSet); List<MessageQueue> list = new ArrayList<>(mqSet);
Collection<MessageQueue> assginMq = Collections.singletonList(list.get(0)); List<MessageQueue> assignList = new ArrayList<>();
litePullConsumer.assign(assginMq); for (int i = 0; i < list.size() / 2; i++) {
int size = 0; assignList.add(list.get(i));
litePullConsumer.seek(list.get(0), 26); }
litePullConsumer.assign(assignList);
while (true) { litePullConsumer.seek(assignList.get(0), 10);
List<MessageExt> messageExts = litePullConsumer.poll(); try {
if (messageExts != null) { while (running) {
size += messageExts.size(); List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
litePullConsumer.commitSync();
} }
litePullConsumer.commitSync(); } finally {
System.out.printf("%s %d %n", messageExts, size); litePullConsumer.shutdown();
} }
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
if (messageExts != null) {
System.out.printf("%s%n", messageExts);
}
}
} finally {
litePullConsumer.shutdown();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册