diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 543e9cffb825a7ced2eba66914dace206fc280c0..33d9e5fc9f9d81a696cf2dadf356cae69943a7a5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -77,28 +77,25 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private boolean unitMode = false;
- private int maxReconsumeTimes = 16;
/**
- * Maximum amount of time in minutes a message may block the consuming thread.
+ * The flag for auto commit offset
*/
- private long consumeTimeout = 15;
+ private boolean autoCommit = true;
/**
- * Is auto commit offset
+ * Pull thread number
*/
- private boolean autoCommit = true;
-
- private int pullThreadNumbers = 20;
+ private int pullThreadNums = 20;
/**
- * Maximum commit offset interval time in seconds.
+ * Maximum commit offset interval time in milliseconds.
*/
- private long autoCommitInterval = 5;
+ private long autoCommitIntervalMillis = 5 * 1000;
/**
* Maximum number of messages pulled each time.
*/
- private int pullBatchNums = 10;
+ private int pullBatchSize = 10;
/**
* Flow control threshold for consume request, each consumer will cache at most 10000 consume requests by default.
@@ -132,22 +129,36 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
private long pollTimeoutMillis = 1000 * 5;
/**
- * Message pull delay in milliseconds
+ * Default constructor.
*/
- private long pullDelayTimeMills = 0;
-
public DefaultLitePullConsumer() {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
}
+ /**
+ * Constructor specifying consumer group.
+ *
+ * @param consumerGroup Consumer group.
+ */
public DefaultLitePullConsumer(final String consumerGroup) {
this(null, consumerGroup, null);
}
+ /**
+ * Constructor specifying RPC hook.
+ *
+ * @param rpcHook RPC hook to execute before each remoting command.
+ */
public DefaultLitePullConsumer(RPCHook rpcHook) {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
}
+ /**
+ * Constructor specifying consumer group, RPC hook
+ *
+ * @param consumerGroup Consumer group.
+ * @param rpcHook RPC hook to execute before each remoting command.
+ */
public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
this(null, consumerGroup, rpcHook);
}
@@ -164,121 +175,229 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
+ /**
+ * Start the consumer
+ */
@Override
public void start() throws MQClientException {
this.defaultLitePullConsumerImpl.start();
}
+ /**
+ * Shutdown the consumer
+ */
@Override
public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown();
}
+ /**
+ * Subscribe some topic with subExpression
+ *
+ * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if
+ * null or * expression,meaning subscribe all
+ * @throws MQClientException if there is any client error.
+ */
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
+ /**
+ * Subscribe some topic with selector.
+ *
+ * @param messageSelector message selector({@link MessageSelector}), can be null.
+ * @throws MQClientException if there is any client error.
+ */
@Override
public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
}
+ /**
+ * Unsubscribe consumption some topic
+ *
+ * @param topic Message topic that needs to be unsubscribe.
+ */
@Override
public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
}
+ /**
+ * Manually assign a list of message queues to this consumer. This interface does not allow for incremental
+ * assignment and will replace the previous assignment (if there is one).
+ *
+ * @param messageQueues Message queues that needs to be assigned.
+ */
@Override
public void assign(Collection messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
}
+ /**
+ * Fetch data for the topics or partitions specified using assign API
+ *
+ * @return list of message, can be null.
+ */
@Override
public List poll() {
return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
}
+ /**
+ * Fetch data for the topics or partitions specified using assign API
+ *
+ * @param timeout The amount time, in milliseconds, spent waiting in poll if data is not available. Must not be
+ * negative
+ * @return list of message, can be null.
+ */
@Override
public List poll(long timeout) {
return defaultLitePullConsumerImpl.poll(timeout);
}
+ /**
+ * Overrides the fetch offsets that the consumer will use on the next poll. If this API is invoked for the same
+ * message queue more than once, the latest offset will be used on the next poll(). Note that you may lose data if
+ * this API is arbitrarily used in the middle of consumption.
+ *
+ * @param messageQueue
+ * @param offset
+ */
@Override
public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
}
+ /**
+ * Suspend pulling from the requested message queues.
+ *
+ * Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop immediately until the
+ * messages of the requested message queues drain.
+ *
+ * Note that this method does not affect message queue subscription. In particular, it does not cause a group
+ * rebalance.
+ *
+ * @param messageQueues Message queues that needs to be paused.
+ */
@Override
public void pause(Collection messageQueues) {
this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
}
+ /**
+ * Resume specified message queues which have been paused with {@link #pause(Collection)}.
+ *
+ * @param messageQueues Message queues that needs to be resumed.
+ */
@Override
public void resume(Collection messageQueues) {
this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
}
+ /**
+ * Get metadata about the message queues for a given topic.
+ *
+ * @param topic The topic that need to get metadata.
+ * @return collection of message queues
+ * @throws MQClientException if there is any client error.
+ */
@Override
public Collection fetchMessageQueues(String topic) throws MQClientException {
return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
}
+ /**
+ * Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
+ * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
+ * queue.
+ *
+ * @param messageQueue Message queues that needs to get offset by timestamp.
+ * @param timestamp
+ * @return offset
+ * @throws MQClientException if there is any client error.
+ */
@Override
public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);
}
+ /**
+ * Register a callback for sensing topic metadata changes.
+ *
+ * @param topic The topic that need to monitor.
+ * @param topicMessageQueueChangeListener Callback when topic metadata changes.
+ * @throws MQClientException if there is any client error.
+ */
+ @Override
public void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {
this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
}
+ /**
+ * Manually commit consume offset.
+ */
@Override
public void commitSync() {
this.defaultLitePullConsumerImpl.commitSync();
}
+ /**
+ * Get the last committed offset for the given message queue.
+ *
+ * @param messageQueue
+ * @return offset, if offset equals -1 means no offset in broker.
+ * @throws MQClientException if there is any client error.
+ */
+ @Override
+ public Long committed(MessageQueue messageQueue) throws MQClientException {
+ return this.defaultLitePullConsumerImpl.committed(messageQueue);
+ }
+
+ /**
+ * Whether to enable auto-commit consume offset.
+ *
+ * @return true if enable auto-commit, false if disable auto-commit.
+ */
@Override
public boolean isAutoCommit() {
return autoCommit;
}
+ /**
+ * Set whether to enable auto-commit consume offset.
+ *
+ * @param autoCommit Whether to enable auto-commit.
+ */
@Override
public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}
- public long getConsumeTimeout() {
- return consumeTimeout;
- }
-
- public void setConsumeTimeout(long consumeTimeout) {
- this.consumeTimeout = consumeTimeout;
+ public int getPullThreadNums() {
+ return pullThreadNums;
}
- public int getPullThreadNumbers() {
- return pullThreadNumbers;
+ public void setPullThreadNums(int pullThreadNums) {
+ this.pullThreadNums = pullThreadNums;
}
- public void setPullThreadNumbers(int pullThreadNumbers) {
- this.pullThreadNumbers = pullThreadNumbers;
+ public long getAutoCommitIntervalMillis() {
+ return autoCommitIntervalMillis;
}
- public long getAutoCommitInterval() {
- return autoCommitInterval;
- }
-
- public void setAutoCommitInterval(long autoCommitInterval) {
- this.autoCommitInterval = autoCommitInterval;
+ public void setAutoCommitIntervalMillis(long autoCommitIntervalMillis) {
+ this.autoCommitIntervalMillis = autoCommitIntervalMillis;
}
public int getPullBatchNums() {
- return pullBatchNums;
+ return pullBatchSize;
}
public void setPullBatchNums(int pullBatchNums) {
- this.pullBatchNums = pullBatchNums;
+ this.pullBatchSize = pullBatchNums;
}
public long getPullThresholdForAll() {
@@ -349,14 +468,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.unitMode = isUnitMode;
}
- public int getMaxReconsumeTimes() {
- return maxReconsumeTimes;
- }
-
- public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
- this.maxReconsumeTimes = maxReconsumeTimes;
- }
-
public MessageModel getMessageModel() {
return messageModel;
}
@@ -392,12 +503,4 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
}
-
- public long getPullDelayTimeMills() {
- return pullDelayTimeMills;
- }
-
- public void setPullDelayTimeMills(long pullDelayTimeMills) {
- this.pullDelayTimeMills = pullDelayTimeMills;
- }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index 87b9dd354b0d34b8bfc0442b93088d8db194caad..d6e657ff18cf491b5893fae0da127fe90f19aa75 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -39,7 +39,8 @@ public interface LitePullConsumer {
* Subscribe some topic with subExpression
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if
- * null or * expression,meaning subscribe all
+ * null or * expression,meaning subscribe all
+ * @throws MQClientException if there is any client error.
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
@@ -47,37 +48,128 @@ public interface LitePullConsumer {
* Subscribe some topic with selector.
*
* @param selector message selector({@link MessageSelector}), can be null.
+ * @throws MQClientException if there is any client error.
*/
void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
/**
* Unsubscribe consumption some topic
*
- * @param topic message topic
+ * @param topic Message topic that needs to be unsubscribe.
*/
void unsubscribe(final String topic);
+ /**
+ * Manually assign a list of message queues to this consumer. This interface does not allow for incremental
+ * assignment and will replace the previous assignment (if there is one).
+ *
+ * @param messageQueues Message queues that needs to be assigned.
+ */
void assign(Collection messageQueues);
+ /**
+ * Fetch data for the topics or partitions specified using assign API
+ *
+ * @return list of message, can be null.
+ */
List poll();
+ /**
+ * Fetch data for the topics or partitions specified using assign API
+ *
+ * @param timeout The amount time, in milliseconds, spent waiting in poll if data is not available. Must not be
+ * negative
+ * @return list of message, can be null.
+ */
List poll(long timeout);
+ /**
+ * Overrides the fetch offsets that the consumer will use on the next poll. If this API is invoked for the same
+ * message queue more than once, the latest offset will be used on the next poll(). Note that you may lose data if
+ * this API is arbitrarily used in the middle of consumption.
+ *
+ * @param messageQueue
+ * @param offset
+ */
void seek(MessageQueue messageQueue, long offset) throws MQClientException;
+ /**
+ * Suspend pulling from the requested message queues.
+ *
+ * Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop immediately until the
+ * messages of the requested message queues drain.
+ *
+ * Note that this method does not affect message queue subscription. In particular, it does not cause a group
+ * rebalance.
+ *
+ * @param messageQueues Message queues that needs to be paused.
+ */
void pause(Collection messageQueues);
+ /**
+ * Resume specified message queues which have been paused with {@link #pause(Collection)}.
+ *
+ * @param messageQueues Message queues that needs to be resumed.
+ */
+ void resume(Collection messageQueues);
+
+ /**
+ * Whether to enable auto-commit consume offset.
+ *
+ * @return true if enable auto-commit, false if disable auto-commit.
+ */
boolean isAutoCommit();
+ /**
+ * Set whether to enable auto-commit consume offset.
+ *
+ * @param autoCommit Whether to enable auto-commit.
+ */
void setAutoCommit(boolean autoCommit);
- void resume(Collection messageQueues);
-
+ /**
+ * Get metadata about the message queues for a given topic.
+ *
+ * @param topic The topic that need to get metadata.
+ * @return collection of message queues
+ * @throws MQClientException if there is any client error.
+ */
Collection fetchMessageQueues(String topic) throws MQClientException;
+ /**
+ * Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
+ * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
+ * queue.
+ *
+ * @param messageQueue Message queues that needs to get offset by timestamp.
+ * @param timestamp
+ * @return offset
+ * @throws MQClientException if there is any client error.
+ */
Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException;
+ /**
+ * Manually commit consume offset.
+ */
void commitSync();
+ /**
+ * Get the last committed offset for the given message queue.
+ *
+ * @param messageQueue
+ * @return offset, if offset equals -1 means no offset in broker.
+ * @throws MQClientException if there is any client error.
+ */
+ Long committed(MessageQueue messageQueue) throws MQClientException;
+ /**
+ * Register a callback for sensing topic metadata changes.
+ *
+ * @param topic The topic that need to monitor.
+ * @param topicMessageQueueChangeListener Callback when topic metadata changes, refer {@link
+ * TopicMessageQueueChangeListener}
+ * @throws MQClientException if there is any client error.
+ */
+ void registerTopicMessageQueueChangeListener(String topic,
+ TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index 8dcaa30e980bd635a2e9390b9fc76c528f282beb..609fc4d1eff42ae7837bea7b9ffba9f465f57cb0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -26,12 +26,12 @@ import org.apache.rocketmq.common.message.MessageQueue;
public class AssignedMessageQueue {
- private ConcurrentHashMap assignedMessageQueueState;
+ private ConcurrentHashMap assignedMessageQueueState;
private RebalanceImpl rebalanceImpl;
public AssignedMessageQueue() {
- assignedMessageQueueState = new ConcurrentHashMap();
+ assignedMessageQueueState = new ConcurrentHashMap();
}
public void setRebalanceImpl(RebalanceImpl rebalanceImpl) {
@@ -43,99 +43,99 @@ public class AssignedMessageQueue {
}
public boolean isPaused(MessageQueue messageQueue) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
- if (messageQueueStat != null) {
- return messageQueueStat.isPaused();
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
+ if (messageQueueState != null) {
+ return messageQueueState.isPaused();
}
return true;
}
public void pause(Collection messageQueues) {
for (MessageQueue messageQueue : messageQueues) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
- messageQueueStat.getPausedLatch().reset();
- messageQueueStat.setPaused(true);
+ messageQueueState.getPausedLatch().reset();
+ messageQueueState.setPaused(true);
}
}
}
public void resume(Collection messageQueueCollection) {
for (MessageQueue messageQueue : messageQueueCollection) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
- messageQueueStat.setPaused(false);
- messageQueueStat.getPausedLatch().reset();
+ messageQueueState.setPaused(false);
+ messageQueueState.getPausedLatch().reset();
}
}
}
public ProcessQueue getProcessQueue(MessageQueue messageQueue) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
- if (messageQueueStat != null) {
- return messageQueueStat.getProcessQueue();
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
+ if (messageQueueState != null) {
+ return messageQueueState.getProcessQueue();
}
return null;
}
public long getPullOffset(MessageQueue messageQueue) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
- if (messageQueueStat != null) {
- return messageQueueStat.getPullOffset();
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
+ if (messageQueueState != null) {
+ return messageQueueState.getPullOffset();
}
return -1;
}
public void updatePullOffset(MessageQueue messageQueue, long offset) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
- if (messageQueueStat != null) {
- messageQueueStat.setPullOffset(offset);
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
+ if (messageQueueState != null) {
+ messageQueueState.setPullOffset(offset);
}
}
public long getConusmerOffset(MessageQueue messageQueue) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
- if (messageQueueStat != null) {
- return messageQueueStat.getConsumeOffset();
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
+ if (messageQueueState != null) {
+ return messageQueueState.getConsumeOffset();
}
return -1;
}
public void updateConsumeOffset(MessageQueue messageQueue, long offset) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
- if (messageQueueStat != null) {
- messageQueueStat.setConsumeOffset(offset);
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
+ if (messageQueueState != null) {
+ messageQueueState.setConsumeOffset(offset);
}
}
public void setSeekOffset(MessageQueue messageQueue, long offset) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
- if (messageQueueStat != null) {
- messageQueueStat.setSeekOffset(offset);
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
+ if (messageQueueState != null) {
+ messageQueueState.setSeekOffset(offset);
}
}
public long getSeekOffset(MessageQueue messageQueue) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
- if (messageQueueStat != null) {
- return messageQueueStat.getSeekOffset();
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
+ if (messageQueueState != null) {
+ return messageQueueState.getSeekOffset();
}
return -1;
}
public CountDownLatch2 getPausedLatch(MessageQueue messageQueue) {
- MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
- if (messageQueueStat != null) {
- return messageQueueStat.getPausedLatch();
+ MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
+ if (messageQueueState != null) {
+ return messageQueueState.getPausedLatch();
}
return null;
}
public void updateAssignedMessageQueue(String topic, Collection assigned) {
synchronized (this.assignedMessageQueueState) {
- Iterator> it = this.assignedMessageQueueState.entrySet().iterator();
+ Iterator> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
- Map.Entry next = it.next();
+ Map.Entry next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!assigned.contains(next.getKey())) {
next.getValue().getProcessQueue().setDropped(true);
@@ -149,9 +149,9 @@ public class AssignedMessageQueue {
public void updateAssignedMessageQueue(Collection assigned) {
synchronized (this.assignedMessageQueueState) {
- Iterator> it = this.assignedMessageQueueState.entrySet().iterator();
+ Iterator> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
- Map.Entry next = it.next();
+ Map.Entry next = it.next();
if (!assigned.contains(next.getKey())) {
next.getValue().getProcessQueue().setDropped(true);
it.remove();
@@ -164,23 +164,23 @@ public class AssignedMessageQueue {
private void addAssignedMessageQueue(Collection assigned) {
for (MessageQueue messageQueue : assigned) {
if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
- MessageQueueStat messageQueueStat;
+ MessageQueueState messageQueueState;
if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
- messageQueueStat = new MessageQueueStat(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue));
+ messageQueueState = new MessageQueueState(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue));
} else {
ProcessQueue processQueue = new ProcessQueue();
- messageQueueStat = new MessageQueueStat(messageQueue, processQueue);
+ messageQueueState = new MessageQueueState(messageQueue, processQueue);
}
- this.assignedMessageQueueState.put(messageQueue, messageQueueStat);
+ this.assignedMessageQueueState.put(messageQueue, messageQueueState);
}
}
}
public void removeAssignedMessageQueue(String topic) {
synchronized (this.assignedMessageQueueState) {
- Iterator> it = this.assignedMessageQueueState.entrySet().iterator();
+ Iterator> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
- Map.Entry next = it.next();
+ Map.Entry next = it.next();
if (next.getKey().getTopic().equals(topic)) {
it.remove();
}
@@ -188,7 +188,7 @@ public class AssignedMessageQueue {
}
}
- public class MessageQueueStat {
+ private class MessageQueueState {
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private volatile boolean paused = false;
@@ -197,7 +197,7 @@ public class AssignedMessageQueue {
private volatile long seekOffset = -1;
private CountDownLatch2 pausedLatch = new CountDownLatch2(1);
- public MessageQueueStat(MessageQueue messageQueue, ProcessQueue processQueue) {
+ public MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) {
this.messageQueue = messageQueue;
this.processQueue = processQueue;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 8ff2b39c4d15946574f876fd75352b8fb886ab60..2fb33385d575a7342e9ffebaaea411b089a98b78 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -97,9 +97,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
NONE, SUBSCRIBE, ASSIGN
}
- private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running.";
+ private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
- private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Cannot select two subscription types at the same time.";
+ private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
/**
* the type of subscription
*/
@@ -116,10 +116,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
* Delay some time when suspend pull service
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
- /**
- * Delay some time when no new message
- */
- private static final long PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG = 0;
private DefaultLitePullConsumer defaultLitePullConsumer;
@@ -150,7 +146,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
- this.defaultLitePullConsumer.getPullThreadNumbers(),
+ this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@@ -219,11 +215,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
case CREATE_JUST:
break;
case RUNNING:
- this.persistConsumerOffset();
+ persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultLitePullConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultLitePullConsumer.getConsumerGroup());
scheduledThreadPoolExecutor.shutdown();
+ scheduledExecutorService.shutdown();
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
@@ -430,6 +427,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
+ if (topic == null || topic.equals("")) {
+ throw new IllegalArgumentException("Topic can not be null or empty.");
+ }
setSubscriptionType(SubscriptionType.SUBSCRIBE);
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
topic, subExpression);
@@ -447,6 +447,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
try {
+ if (topic == null || topic.equals("")) {
+ throw new IllegalArgumentException("Topic can not be null or empty.");
+ }
setSubscriptionType(SubscriptionType.SUBSCRIBE);
if (messageSelector == null) {
subscribe(topic, SubscriptionData.SUB_ALL);
@@ -487,7 +490,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long now = System.currentTimeMillis();
if (now >= nextAutoCommitDeadline) {
commitAll();
- nextAutoCommitDeadline = now + defaultLitePullConsumer.getAutoCommitInterval() * 1000;
+ nextAutoCommitDeadline = now + defaultLitePullConsumer.getAutoCommitIntervalMillis();
}
}
@@ -570,21 +573,21 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
- public long maxOffset(MessageQueue mq) throws MQClientException {
+ private long maxOffset(MessageQueue messageQueue) throws MQClientException {
checkServiceState();
- return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue);
}
- public long minOffset(MessageQueue mq) throws MQClientException {
+ private long minOffset(MessageQueue messageQueue) throws MQClientException {
checkServiceState();
- return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
+ return this.mQClientFactory.getMQAdminImpl().minOffset(messageQueue);
}
- public void removePullTaskCallback(final String topic) {
+ private void removePullTaskCallback(final String topic) {
removePullTask(topic);
}
- public void removePullTask(final String topic) {
+ private void removePullTask(final String topic) {
Iterator> it = this.taskTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry next = it.next();
@@ -615,7 +618,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
- public synchronized void commitAll() {
+ private synchronized void commitAll() {
try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue);
@@ -635,9 +638,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
- private void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
- if (assignedMessageQueue.getSeekOffset(remoteQueue) == -1) {
- assignedMessageQueue.updatePullOffset(remoteQueue, nextPullOffset);
+ private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset) {
+ if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
+ assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset);
}
}
@@ -649,27 +652,35 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
- private long fetchConsumeOffset(MessageQueue mq, boolean fromStore) {
+ private long fetchConsumeOffset(MessageQueue messageQueue, boolean fromStore) {
checkServiceState();
- return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ return this.offsetStore.readOffset(messageQueue, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
}
- private long nextPullOffset(MessageQueue remoteQueue) {
+ public long committed(MessageQueue messageQueue) throws MQClientException {
+ checkServiceState();
+ long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
+ if (offset == -2)
+ throw new MQClientException("Fetch consume offset from broker exception", null);
+ return offset;
+ }
+
+ private long nextPullOffset(MessageQueue messageQueue) {
long offset = -1;
- long seekOffset = assignedMessageQueue.getSeekOffset(remoteQueue);
+ long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue);
if (seekOffset != -1) {
offset = seekOffset;
- assignedMessageQueue.setSeekOffset(remoteQueue, -1);
- assignedMessageQueue.updatePullOffset(remoteQueue, offset);
+ assignedMessageQueue.setSeekOffset(messageQueue, -1);
+ assignedMessageQueue.updatePullOffset(messageQueue, offset);
} else {
- offset = assignedMessageQueue.getPullOffset(remoteQueue);
+ offset = assignedMessageQueue.getPullOffset(messageQueue);
if (offset == -1) {
- offset = fetchConsumeOffset(remoteQueue, false);
+ offset = fetchConsumeOffset(messageQueue, false);
if (offset == -1 && defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offset = 0;
}
- assignedMessageQueue.updatePullOffset(remoteQueue, offset);
- assignedMessageQueue.updateConsumeOffset(remoteQueue, offset);
+ assignedMessageQueue.updatePullOffset(messageQueue, offset);
+ assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
}
}
@@ -751,7 +762,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
long offset = nextPullOffset(messageQueue);
- long pullDelayTimeMills = defaultLitePullConsumer.getPullDelayTimeMills();
+ long pullDelayTimeMills = 0;
try {
SubscriptionData subscriptionData;
@@ -775,9 +786,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
case OFFSET_ILLEGAL:
log.warn("The pull request offset illegal, {}", pullResult.toString());
break;
- case NO_NEW_MSG:
- pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG;
- break;
default:
break;
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 68144c785ea5cbe5dfe37e85a2594c61b524728f..8cceabcb2c2d2364306af1d0bc6b3ce3e494a2e1 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
@@ -44,7 +45,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.junit.After;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -56,6 +57,7 @@ import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -77,7 +79,6 @@ public class DefaultLitePullConsumerTest {
private RebalanceImpl rebalanceImpl;
private OffsetStore offsetStore;
- private DefaultLitePullConsumer litePullConsumer;
private DefaultLitePullConsumerImpl litePullConsumerImpl;
private String consumerGroup = "LitePullConsumerGroup";
private String topic = "LitePullConsumerTest";
@@ -85,20 +86,150 @@ public class DefaultLitePullConsumerTest {
@Before
public void init() throws Exception {
- String groupName = consumerGroup + System.currentTimeMillis();
- litePullConsumer = new DefaultLitePullConsumer(groupName);
- litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
-
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
field.setAccessible(true);
RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
field = RebalanceService.class.getDeclaredField("waitInterval");
field.setAccessible(true);
field.set(rebalanceService, 100);
+ }
- litePullConsumer.start();
+ @Test
+ public void testAssign_PollMessageSuccess() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
+ try {
+ MessageQueue messageQueue = createMessageQueue();
+ litePullConsumer.assign(Collections.singletonList(messageQueue));
+ List result = litePullConsumer.poll();
+ assertThat(result.get(0).getTopic()).isEqualTo(topic);
+ assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+
+ @Test
+ public void testSubscribe_PollMessageSuccess() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createSubscribeLitePullConsumer();
+ try {
+ Set messageQueueSet = new HashSet();
+ messageQueueSet.add(createMessageQueue());
+ litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
+ litePullConsumer.setPollTimeoutMillis(20 * 1000);
+ List result = litePullConsumer.poll();
+ assertThat(result.get(0).getTopic()).isEqualTo(topic);
+ assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+
+ @Test
+ public void testSubscribe_BroadcastPollMessageSuccess() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createBroadcastLitePullConsumer();
+ try {
+ Set messageQueueSet = new HashSet();
+ messageQueueSet.add(createMessageQueue());
+ litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
+ litePullConsumer.setPollTimeoutMillis(20 * 1000);
+ List result = litePullConsumer.poll();
+ assertThat(result.get(0).getTopic()).isEqualTo(topic);
+ assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+
+ @Test
+ public void testSubscriptionType_AssignAndSubscribeExclusive() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
+ try {
+ litePullConsumer.subscribe(topic, "*");
+ litePullConsumer.assign(Collections.singletonList(createMessageQueue()));
+ failBecauseExceptionWasNotThrown(IllegalStateException.class);
+ } catch (IllegalStateException e) {
+ assertThat(e).hasMessageContaining("Subscribe and assign are mutually exclusive.");
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+
+ @Test
+ public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer();
+ try {
+ litePullConsumer.fetchMessageQueues(topic);
+ failBecauseExceptionWasNotThrown(IllegalStateException.class);
+ } catch (IllegalStateException e) {
+ assertThat(e).hasMessageContaining("The consumer not running, please start it first.");
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+
+ @Test
+ public void testSeek_SeekOffsetIllegal() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
+ when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
+ when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
+ MessageQueue messageQueue = createMessageQueue();
+ litePullConsumer.assign(Collections.singletonList(messageQueue));
+ try {
+ litePullConsumer.seek(messageQueue, -1);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("min offset = 0");
+ }
+
+ try {
+ litePullConsumer.seek(messageQueue, 1000);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("max offset = 100");
+ }
+ litePullConsumer.shutdown();
+ }
- field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
+ @Test
+ public void testSeek_SeekOffsetSuccess() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
+ when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
+ when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
+ MessageQueue messageQueue = createMessageQueue();
+ litePullConsumer.assign(Collections.singletonList(messageQueue));
+ litePullConsumer.seek(messageQueue, 50);
+ Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
+ field.setAccessible(true);
+ AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(litePullConsumerImpl);
+ assertEquals(assignedMessageQueue.getSeekOffset(messageQueue), 50);
+ assertEquals(assignedMessageQueue.getConusmerOffset(messageQueue), 50);
+ litePullConsumer.shutdown();
+ }
+
+ @Test
+ public void testSeek_MessageQueueNotInAssignList() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer();
+ try {
+ litePullConsumer.seek(createMessageQueue(), 0);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("The message queue is not in assigned list");
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+
+ private MessageQueue createMessageQueue() {
+ MessageQueue messageQueue = new MessageQueue();
+ messageQueue.setBrokerName(brokerName);
+ messageQueue.setQueueId(0);
+ messageQueue.setTopic(topic);
+ return messageQueue;
+ }
+
+ private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
+
+ Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
@@ -156,96 +287,35 @@ public class DefaultLitePullConsumerTest {
doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class));
}
- @After
- public void terminate() {
- litePullConsumer.shutdown();
- }
-
- @Test
- public void testAssign_PollMessageSuccess() {
- MessageQueue messageQueue = createMessageQueue();
- litePullConsumer.setPullDelayTimeMills(60 * 1000);
- litePullConsumer.assign(Collections.singletonList(messageQueue));
- List result = litePullConsumer.poll();
- assertThat(result.get(0).getTopic()).isEqualTo(topic);
- assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
- }
-
- @Test
- public void testSubscribe_PollMessageSuccess() throws MQClientException {
- litePullConsumer.setPullDelayTimeMills(60 * 1000);
+ private DefaultLitePullConsumer createSubscribeLitePullConsumer() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+ litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*");
- Set messageQueueSet = new HashSet();
- messageQueueSet.add(createMessageQueue());
- litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
- litePullConsumer.setPollTimeoutMillis(20 * 1000);
- List result = litePullConsumer.poll();
- assertThat(result.get(0).getTopic()).isEqualTo(topic);
- assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
- }
- @Test
- public void testSubscriptionType_AssignAndSubscribeExclusive() throws MQClientException {
- try {
- litePullConsumer.subscribe(topic, "*");
- litePullConsumer.assign(Collections.singletonList(createMessageQueue()));
- failBecauseExceptionWasNotThrown(IllegalStateException.class);
- } catch (IllegalStateException e) {
- assertThat(e).hasMessageContaining("Cannot select two subscription types at the same time.");
- }
- }
-
- @Test
- public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws MQClientException {
- try {
- DefaultLitePullConsumer litePullConsumer = createLitePullConsumer();
- litePullConsumer.fetchMessageQueues(topic);
- failBecauseExceptionWasNotThrown(IllegalStateException.class);
- } catch (IllegalStateException e) {
- assertThat(e).hasMessageContaining("The consumer not running.");
- }
- }
-
- @Test
- public void testSeek_SeekOffsetIllegal() throws MQClientException {
- when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
- when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
- MessageQueue messageQueue = createMessageQueue();
- litePullConsumer.assign(Collections.singletonList(messageQueue));
- try {
- litePullConsumer.seek(messageQueue, -1);
- failBecauseExceptionWasNotThrown(MQClientException.class);
- } catch (MQClientException e) {
- assertThat(e).hasMessageContaining("min offset = 0");
- }
-
- try {
- litePullConsumer.seek(messageQueue, 1000);
- failBecauseExceptionWasNotThrown(MQClientException.class);
- } catch (MQClientException e) {
- assertThat(e).hasMessageContaining("max offset = 100");
- }
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
+ return litePullConsumer;
}
- @Test
- public void testSeek_MessageQueueNotInAssignList() {
- try {
- litePullConsumer.seek(createMessageQueue(), 0);
- failBecauseExceptionWasNotThrown(MQClientException.class);
- } catch (MQClientException e) {
- assertThat(e).hasMessageContaining("The message queue is not in assigned list");
- }
+ private DefaultLitePullConsumer createBroadcastLitePullConsumer() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+ litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+ litePullConsumer.setMessageModel(MessageModel.BROADCASTING);
+ litePullConsumer.subscribe(topic, "*");
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
+ return litePullConsumer;
}
- private MessageQueue createMessageQueue() {
- MessageQueue messageQueue = new MessageQueue();
- messageQueue.setBrokerName(brokerName);
- messageQueue.setQueueId(0);
- messageQueue.setTopic(topic);
- return messageQueue;
+ private DefaultLitePullConsumer createStartLitePullConsumer() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+ litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
+ return litePullConsumer;
}
- private DefaultLitePullConsumer createLitePullConsumer() {
+ private DefaultLitePullConsumer createNotStartLitePullConsumer() {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
return litePullConsumer;
}
@@ -258,4 +328,5 @@ public class DefaultLitePullConsumerTest {
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
+
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java
similarity index 70%
rename from example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumer.java
rename to example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java
index b0a2b596ade8fe6205f59308d99e71196a979223..e638de1c960ba9fd37a9f16515161d2aa3afe712 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java
@@ -18,31 +18,35 @@ package org.apache.rocketmq.example.simple;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-public class LitePullConsumer {
+public class LitePullConsumerAssign {
+
+ public static volatile boolean running = true;
+
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
Collection mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
List list = new ArrayList<>(mqSet);
- Collection assginMq = Collections.singletonList(list.get(0));
- litePullConsumer.assign(assginMq);
- int size = 0;
- litePullConsumer.seek(list.get(0), 26);
-
- while (true) {
- List messageExts = litePullConsumer.poll();
- if (messageExts != null) {
- size += messageExts.size();
+ List assignList = new ArrayList<>();
+ for (int i = 0; i < list.size() / 2; i++) {
+ assignList.add(list.get(i));
+ }
+ litePullConsumer.assign(assignList);
+ litePullConsumer.seek(assignList.get(0), 10);
+ try {
+ while (running) {
+ List messageExts = litePullConsumer.poll();
+ System.out.printf("%s %n", messageExts);
+ litePullConsumer.commitSync();
}
- litePullConsumer.commitSync();
- System.out.printf("%s %d %n", messageExts, size);
+ } finally {
+ litePullConsumer.shutdown();
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
new file mode 100644
index 0000000000000000000000000000000000000000..f0dc4d2adf9716bc26cd8b428068cbd5a8eb1623
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class LitePullConsumerSubscribe {
+
+ public static volatile boolean running = true;
+
+ public static void main(String[] args) throws Exception {
+ DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
+ litePullConsumer.subscribe("TopicTest", "*");
+ litePullConsumer.start();
+ try {
+ while (running) {
+ List messageExts = litePullConsumer.poll();
+ if (messageExts != null) {
+ System.out.printf("%s%n", messageExts);
+ }
+ }
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+}