diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 5d6acc0f872bf645ecc28cc1665ab94c8c34725d..1b96cd058ff129dc01d1d18aff44c0f0a6e8da1c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -53,14 +53,17 @@ public class Validators {
if (UtilAll.isBlank(group)) {
throw new MQClientException("the specified group is blank", null);
}
+
+ if (group.length() > CHARACTER_MAX_LENGTH) {
+ throw new MQClientException("the specified group is longer than group max length 255.", null);
+ }
+
if (!regularExpressionMatcher(group, PATTERN)) {
throw new MQClientException(String.format(
"the specified group[%s] contains illegal characters, allowing only %s", group,
VALID_PATTERN_STR), null);
}
- if (group.length() > CHARACTER_MAX_LENGTH) {
- throw new MQClientException("the specified group is longer than group max length 255.", null);
- }
+
}
/**
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 33d9e5fc9f9d81a696cf2dadf356cae69943a7a5..5a2189f3270062cd2d983625992ef1dfc45c2484 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -128,6 +128,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private long pollTimeoutMillis = 1000 * 5;
+ /**
+ * Interval time in in milliseconds for checking changes in topic metadata.
+ */
+ private long topicMetadataCheckIntervalMillis = 30 * 1000;
+
/**
* Default constructor.
*/
@@ -175,202 +180,92 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
- /**
- * Start the consumer
- */
@Override
public void start() throws MQClientException {
this.defaultLitePullConsumerImpl.start();
}
- /**
- * Shutdown the consumer
- */
@Override
public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown();
}
- /**
- * Subscribe some topic with subExpression
- *
- * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if
- * null or * expression,meaning subscribe all
- * @throws MQClientException if there is any client error.
- */
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
- /**
- * Subscribe some topic with selector.
- *
- * @param messageSelector message selector({@link MessageSelector}), can be null.
- * @throws MQClientException if there is any client error.
- */
@Override
public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
}
- /**
- * Unsubscribe consumption some topic
- *
- * @param topic Message topic that needs to be unsubscribe.
- */
@Override
public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
}
- /**
- * Manually assign a list of message queues to this consumer. This interface does not allow for incremental
- * assignment and will replace the previous assignment (if there is one).
- *
- * @param messageQueues Message queues that needs to be assigned.
- */
@Override
public void assign(Collection messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
}
- /**
- * Fetch data for the topics or partitions specified using assign API
- *
- * @return list of message, can be null.
- */
@Override
public List poll() {
return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
}
- /**
- * Fetch data for the topics or partitions specified using assign API
- *
- * @param timeout The amount time, in milliseconds, spent waiting in poll if data is not available. Must not be
- * negative
- * @return list of message, can be null.
- */
@Override
public List poll(long timeout) {
return defaultLitePullConsumerImpl.poll(timeout);
}
- /**
- * Overrides the fetch offsets that the consumer will use on the next poll. If this API is invoked for the same
- * message queue more than once, the latest offset will be used on the next poll(). Note that you may lose data if
- * this API is arbitrarily used in the middle of consumption.
- *
- * @param messageQueue
- * @param offset
- */
@Override
public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
}
- /**
- * Suspend pulling from the requested message queues.
- *
- * Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop immediately until the
- * messages of the requested message queues drain.
- *
- * Note that this method does not affect message queue subscription. In particular, it does not cause a group
- * rebalance.
- *
- * @param messageQueues Message queues that needs to be paused.
- */
@Override
public void pause(Collection messageQueues) {
this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
}
- /**
- * Resume specified message queues which have been paused with {@link #pause(Collection)}.
- *
- * @param messageQueues Message queues that needs to be resumed.
- */
@Override
public void resume(Collection messageQueues) {
this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
}
- /**
- * Get metadata about the message queues for a given topic.
- *
- * @param topic The topic that need to get metadata.
- * @return collection of message queues
- * @throws MQClientException if there is any client error.
- */
@Override
public Collection fetchMessageQueues(String topic) throws MQClientException {
return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
}
- /**
- * Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
- * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
- * queue.
- *
- * @param messageQueue Message queues that needs to get offset by timestamp.
- * @param timestamp
- * @return offset
- * @throws MQClientException if there is any client error.
- */
@Override
public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);
}
- /**
- * Register a callback for sensing topic metadata changes.
- *
- * @param topic The topic that need to monitor.
- * @param topicMessageQueueChangeListener Callback when topic metadata changes.
- * @throws MQClientException if there is any client error.
- */
@Override
public void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {
this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
}
- /**
- * Manually commit consume offset.
- */
@Override
public void commitSync() {
this.defaultLitePullConsumerImpl.commitSync();
}
- /**
- * Get the last committed offset for the given message queue.
- *
- * @param messageQueue
- * @return offset, if offset equals -1 means no offset in broker.
- * @throws MQClientException if there is any client error.
- */
@Override
public Long committed(MessageQueue messageQueue) throws MQClientException {
return this.defaultLitePullConsumerImpl.committed(messageQueue);
}
- /**
- * Whether to enable auto-commit consume offset.
- *
- * @return true if enable auto-commit, false if disable auto-commit.
- */
@Override
public boolean isAutoCommit() {
return autoCommit;
}
- /**
- * Set whether to enable auto-commit consume offset.
- *
- * @param autoCommit Whether to enable auto-commit.
- */
@Override
public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
@@ -392,12 +287,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.autoCommitIntervalMillis = autoCommitIntervalMillis;
}
- public int getPullBatchNums() {
+ public int getPullBatchSize() {
return pullBatchSize;
}
- public void setPullBatchNums(int pullBatchNums) {
- this.pullBatchSize = pullBatchNums;
+ public void setPullBatchSize(int pullBatchSize) {
+ this.pullBatchSize = pullBatchSize;
}
public long getPullThresholdForAll() {
@@ -503,4 +398,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
}
+
+ public long getTopicMetadataCheckIntervalMillis() {
+ return topicMetadataCheckIntervalMillis;
+ }
+
+ public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) {
+ this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 63dc525e7ef97fb1096922794b01d61df8746082..6b762383717bd01889b355334540f0fca061d6ec 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -117,25 +117,24 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
return;
final HashSet unusedMQ = new HashSet();
- if (!mqs.isEmpty()) {
- for (Map.Entry entry : this.offsetTable.entrySet()) {
- MessageQueue mq = entry.getKey();
- AtomicLong offset = entry.getValue();
- if (offset != null) {
- if (mqs.contains(mq)) {
- try {
- this.updateConsumeOffsetToBroker(mq, offset.get());
- log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
- this.groupName,
- this.mQClientFactory.getClientId(),
- mq,
- offset.get());
- } catch (Exception e) {
- log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
- }
- } else {
- unusedMQ.add(mq);
+
+ for (Map.Entry entry : this.offsetTable.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ AtomicLong offset = entry.getValue();
+ if (offset != null) {
+ if (mqs.contains(mq)) {
+ try {
+ this.updateConsumeOffsetToBroker(mq, offset.get());
+ log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
+ this.groupName,
+ this.mQClientFactory.getClientId(),
+ mq,
+ offset.get());
+ } catch (Exception e) {
+ log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
+ } else {
+ unusedMQ.add(mq);
}
}
}
@@ -187,8 +186,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
/**
- * Update the Consumer Offset in one way, once the Master is off, updated to Slave,
- * here need to be optimized.
+ * Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized.
*/
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
@@ -196,8 +194,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
/**
- * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
- * here need to be optimized.
+ * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index 609fc4d1eff42ae7837bea7b9ffba9f465f57cb0..c0c6f6030c2599457aef833f389e62a22fcee296 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -21,12 +21,11 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.MessageQueue;
public class AssignedMessageQueue {
- private ConcurrentHashMap assignedMessageQueueState;
+ private final ConcurrentHashMap assignedMessageQueueState;
private RebalanceImpl rebalanceImpl;
@@ -54,7 +53,6 @@ public class AssignedMessageQueue {
for (MessageQueue messageQueue : messageQueues) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
- messageQueueState.getPausedLatch().reset();
messageQueueState.setPaused(true);
}
}
@@ -65,7 +63,6 @@ public class AssignedMessageQueue {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueState.setPaused(false);
- messageQueueState.getPausedLatch().reset();
}
}
}
@@ -123,14 +120,6 @@ public class AssignedMessageQueue {
return -1;
}
- public CountDownLatch2 getPausedLatch(MessageQueue messageQueue) {
- MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
- if (messageQueueState != null) {
- return messageQueueState.getPausedLatch();
- }
- return null;
- }
-
public void updateAssignedMessageQueue(String topic, Collection assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator> it = this.assignedMessageQueueState.entrySet().iterator();
@@ -195,9 +184,8 @@ public class AssignedMessageQueue {
private volatile long pullOffset = -1;
private volatile long consumeOffset = -1;
private volatile long seekOffset = -1;
- private CountDownLatch2 pausedLatch = new CountDownLatch2(1);
- public MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) {
+ private MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) {
this.messageQueue = messageQueue;
this.processQueue = processQueue;
}
@@ -249,9 +237,5 @@ public class AssignedMessageQueue {
public void setSeekOffset(long seekOffset) {
this.seekOffset = seekOffset;
}
-
- public CountDownLatch2 getPausedLatch() {
- return pausedLatch;
- }
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 2fb33385d575a7342e9ffebaaea411b089a98b78..5dd9c352dc3d841dd2aafef527afa4361926bf8d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -53,7 +53,6 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -142,6 +141,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private long nextAutoCommitDeadline = -1L;
+ private final MessageQueueLock messageQueueLock = new MessageQueueLock();
+
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
@@ -206,8 +207,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
- private int nextPullBatchNums() {
- return Math.min(this.defaultLitePullConsumer.getPullBatchNums(), consumeRequestCache.remainingCapacity());
+ private int nextPullBatchSize() {
+ return Math.min(this.defaultLitePullConsumer.getPullBatchSize(), consumeRequestCache.remainingCapacity());
}
public synchronized void shutdown() {
@@ -217,13 +218,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
case RUNNING:
persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultLitePullConsumer.getConsumerGroup());
- this.mQClientFactory.shutdown();
- log.info("the consumer [{}] shutdown OK", this.defaultLitePullConsumer.getConsumerGroup());
scheduledThreadPoolExecutor.shutdown();
scheduledExecutorService.shutdown();
+ this.mQClientFactory.shutdown();
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
- break;
- case SHUTDOWN_ALREADY:
+ log.info("the consumer [{}] shutdown OK", this.defaultLitePullConsumer.getConsumerGroup());
break;
default:
break;
@@ -237,79 +236,28 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
this.checkConfig();
- this.copySubscription();
-
if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultLitePullConsumer.changeInstanceNameToPID();
}
- this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
-
- this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
- this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
- this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
- this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
-
- this.pullAPIWrapper = new PullAPIWrapper(
- mQClientFactory,
- this.defaultLitePullConsumer.getConsumerGroup(), isUnitMode());
- this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
-
- if (this.defaultLitePullConsumer.getOffsetStore() != null) {
- this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
- } else {
- switch (this.defaultLitePullConsumer.getMessageModel()) {
- case BROADCASTING:
- this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
- break;
- case CLUSTERING:
- this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
- break;
- default:
- break;
- }
- this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
- }
+ initMQClientFactory();
- this.offsetStore.load();
+ initRebalanceImpl();
- boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
- if (!registerOK) {
- this.serviceState = ServiceState.CREATE_JUST;
+ initPullAPIWrapper();
- throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
- + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
- null);
- }
+ initOffsetStore();
mQClientFactory.start();
- if (subscriptionType == SubscriptionType.SUBSCRIBE) {
- updateTopicSubscribeInfoWhenSubscriptionChanged();
- }
- if (subscriptionType == SubscriptionType.ASSIGN) {
- updateAssignPullTask(assignedMessageQueue.messageQueues());
- }
+ startScheduleTask();
- scheduledExecutorService.scheduleAtFixedRate(
- new Runnable() {
- @Override
- public void run() {
- try {
- fetchTopicMessageQueuesAndCompare();
- } catch (Exception e) {
- log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
- }
- }
- }, 1000 * 20, 1000 * 30, TimeUnit.MILLISECONDS);
+ this.serviceState = ServiceState.RUNNING;
log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
- this.serviceState = ServiceState.RUNNING;
- for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
- Set messageQueues = fetchMessageQueues(topic);
- messageQueuesForTopic.put(topic, messageQueues);
- }
- this.mQClientFactory.checkClientInBroker();
+
+ operateAfterRunning();
+
break;
case RUNNING:
case START_FAILED:
@@ -323,19 +271,87 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
- private void checkConfig() throws MQClientException {
- // check consumerGroup
- Validators.checkGroup(this.defaultLitePullConsumer.getConsumerGroup());
+ private void initMQClientFactory() throws MQClientException {
+ this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
+ boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
+ if (!registerOK) {
+ this.serviceState = ServiceState.CREATE_JUST;
- // consumerGroup
- if (null == this.defaultLitePullConsumer.getConsumerGroup()) {
- throw new MQClientException(
- "consumerGroup is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
+ + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
+ }
+
+ private void initRebalanceImpl() {
+ this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
+ this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
+ this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
+ this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
+ }
+
+ private void initPullAPIWrapper() {
+ this.pullAPIWrapper = new PullAPIWrapper(
+ mQClientFactory,
+ this.defaultLitePullConsumer.getConsumerGroup(), isUnitMode());
+ this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
+ }
- // consumerGroup
+ private void initOffsetStore() throws MQClientException {
+ if (this.defaultLitePullConsumer.getOffsetStore() != null) {
+ this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
+ } else {
+ switch (this.defaultLitePullConsumer.getMessageModel()) {
+ case BROADCASTING:
+ this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
+ break;
+ case CLUSTERING:
+ this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
+ break;
+ default:
+ break;
+ }
+ this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
+ }
+ this.offsetStore.load();
+ }
+
+ private void startScheduleTask() {
+ scheduledExecutorService.scheduleAtFixedRate(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ fetchTopicMessageQueuesAndCompare();
+ } catch (Exception e) {
+ log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
+ }
+ }
+ }, 1000 * 20, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ private void operateAfterRunning() throws MQClientException {
+ // If subscribe function invoke before start function, then update topic subscribe info after initialization.
+ if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ }
+ // If assign function invoke before start function, then update pull task after initialization.
+ if (subscriptionType == SubscriptionType.ASSIGN) {
+ updateAssignPullTask(assignedMessageQueue.messageQueues());
+ }
+
+ for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
+ Set messageQueues = fetchMessageQueues(topic);
+ messageQueuesForTopic.put(topic, messageQueues);
+ }
+ this.mQClientFactory.checkClientInBroker();
+ }
+
+ private void checkConfig() throws MQClientException {
+ // Check consumerGroup
+ Validators.checkGroup(this.defaultLitePullConsumer.getConsumerGroup());
+
+ // Check consumerGroup name is not equal default consumer group name.
if (this.defaultLitePullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
throw new MQClientException(
"consumerGroup can not equal "
@@ -345,7 +361,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
null);
}
- // messageModel
+ // Check messageModel is not null.
if (null == this.defaultLitePullConsumer.getMessageModel()) {
throw new MQClientException(
"messageModel is null"
@@ -353,7 +369,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
null);
}
- // allocateMessageQueueStrategy
+ // Check allocateMessageQueueStrategy is not null
if (null == this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()) {
throw new MQClientException(
"allocateMessageQueueStrategy is null"
@@ -361,7 +377,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
null);
}
- // allocateMessageQueueStrategy
if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
throw new MQClientException(
"Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
@@ -374,24 +389,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return pullAPIWrapper;
}
- private void copySubscription() throws MQClientException {
- try {
- switch (this.defaultLitePullConsumer.getMessageModel()) {
- case BROADCASTING:
- break;
- case CLUSTERING:
- /*
- * Retry topic will be support in the future.
- */
- break;
- default:
- break;
- }
- } catch (Exception e) {
- throw new MQClientException("subscription exception", e);
- }
- }
-
private void startPullTask(Collection mqSet) {
for (MessageQueue messageQueue : mqSet) {
if (!this.taskTable.containsKey(messageQueue)) {
@@ -526,7 +523,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} catch (InterruptedException ignore) {
}
- return null;
+
+ return Collections.emptyList();
}
public void pause(Collection messageQueues) {
@@ -547,29 +545,13 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
long minOffset = minOffset(messageQueue);
long maxOffset = maxOffset(messageQueue);
- if (offset < minOffset || offset > maxOffset)
+ if (offset < minOffset || offset > maxOffset) {
throw new MQClientException("Seek offset illegal, seek offset = " + offset + ", min offset = " + minOffset + ", max offset = " + maxOffset, null);
- try {
- assignedMessageQueue.pause(Collections.singletonList(messageQueue));
- CountDownLatch2 pausedLatch = assignedMessageQueue.getPausedLatch(messageQueue);
- if (pausedLatch != null) {
- pausedLatch.await(2, TimeUnit.SECONDS);
- }
- ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
- if (processQueue != null) {
- processQueue.clear();
- }
- Iterator iter = consumeRequestCache.iterator();
- while (iter.hasNext()) {
- if (iter.next().getMessageQueue().equals(messageQueue))
- iter.remove();
- }
+ }
+ final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
+ synchronized (objLock) {
assignedMessageQueue.setSeekOffset(messageQueue, offset);
- assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
- } catch (Exception e) {
- log.error("Seek offset failed.", e);
- } finally {
- assignedMessageQueue.resume(Collections.singletonList(messageQueue));
+ clearMessageQueueInCache(messageQueue);
}
}
@@ -611,8 +593,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
}
- if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING)
+ if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offsetStore.persistAll(assignedMessageQueue.messageQueues());
+ }
} catch (Exception e) {
log.error("An error occurred when update consume offset synchronously.", e);
}
@@ -631,8 +614,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
}
- if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING)
+ if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offsetStore.persistAll(assignedMessageQueue.messageQueues());
+ }
} catch (Exception e) {
log.error("An error occurred when update consume offset Automatically.");
}
@@ -665,13 +649,25 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return offset;
}
+ private void clearMessageQueueInCache(MessageQueue messageQueue) {
+ ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+ if (processQueue != null) {
+ processQueue.clear();
+ }
+ Iterator iter = consumeRequestCache.iterator();
+ while (iter.hasNext()) {
+ if (iter.next().getMessageQueue().equals(messageQueue))
+ iter.remove();
+ }
+ }
+
private long nextPullOffset(MessageQueue messageQueue) {
long offset = -1;
long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue);
if (seekOffset != -1) {
offset = seekOffset;
+ assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
assignedMessageQueue.setSeekOffset(messageQueue, -1);
- assignedMessageQueue.updatePullOffset(messageQueue, offset);
} else {
offset = assignedMessageQueue.getPullOffset(messageQueue);
if (offset == -1) {
@@ -679,11 +675,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
if (offset == -1 && defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offset = 0;
}
- assignedMessageQueue.updatePullOffset(messageQueue, offset);
- assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
}
+ assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
}
-
return offset;
}
@@ -706,9 +700,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
if (!this.isCancelled()) {
if (assignedMessageQueue.isPaused(messageQueue)) {
- CountDownLatch2 pasuedLatch = assignedMessageQueue.getPausedLatch(messageQueue);
- if (pasuedLatch != null)
- pasuedLatch.countDown();
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
log.debug("Message Queue: {} has been paused!", messageQueue);
return;
@@ -721,7 +712,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return;
}
- if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchNums() > defaultLitePullConsumer.getPullThresholdForAll()) {
+ if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0)
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
@@ -764,7 +755,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long offset = nextPullOffset(messageQueue);
long pullDelayTimeMills = 0;
try {
-
SubscriptionData subscriptionData;
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
String topic = this.messageQueue.getTopic();
@@ -775,12 +765,15 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
topic, SubscriptionData.SUB_ALL);
}
- PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchNums());
+ PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize());
switch (pullResult.getPullStatus()) {
case FOUND:
- if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) {
- processQueue.putMessage(pullResult.getMsgFoundList());
- submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
+ final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
+ synchronized (objLock) {
+ if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
+ processQueue.putMessage(pullResult.getMsgFoundList());
+ submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
+ }
}
break;
case OFFSET_ILLEGAL:
@@ -982,11 +975,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return offsetStore;
}
- public void registerFilterMessageHook(final FilterMessageHook hook) {
- this.filterMessageHookList.add(hook);
- log.info("register FilterMessageHook Hook, {}", hook.hookName());
- }
-
public DefaultLitePullConsumer getDefaultLitePullConsumer() {
return defaultLitePullConsumer;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 2a4fb7dfa6cedc44079425b5c455b85db56e9e1c..4f9d42cc57d1e8a131d238ffb7cc229641c3c30c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -246,10 +246,6 @@ public class MQClientInstance {
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
- case RUNNING:
- break;
- case SHUTDOWN_ALREADY:
- break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 8cceabcb2c2d2364306af1d0bc6b3ce3e494a2e1..cbcc739266fe9d8d872711acca60f1aadebbfb18 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -202,7 +202,6 @@ public class DefaultLitePullConsumerTest {
field.setAccessible(true);
AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(litePullConsumerImpl);
assertEquals(assignedMessageQueue.getSeekOffset(messageQueue), 50);
- assertEquals(assignedMessageQueue.getConusmerOffset(messageQueue), 50);
litePullConsumer.shutdown();
}
@@ -219,6 +218,26 @@ public class DefaultLitePullConsumerTest {
}
}
+ @Test
+ public void testPauseAndResume_Success() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer();
+ try {
+ MessageQueue messageQueue = createMessageQueue();
+ litePullConsumer.assign(Collections.singletonList(messageQueue));
+ litePullConsumer.pause(Collections.singletonList(messageQueue));
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
+ List result = litePullConsumer.poll();
+ assertThat(result.isEmpty()).isTrue();
+ litePullConsumer.resume(Collections.singletonList(messageQueue));
+ result = litePullConsumer.poll();
+ assertThat(result.get(0).getTopic()).isEqualTo(topic);
+ assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+
private MessageQueue createMessageQueue() {
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
@@ -328,5 +347,4 @@ public class DefaultLitePullConsumerTest {
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
-
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
index f0dc4d2adf9716bc26cd8b428068cbd5a8eb1623..1bfe49d7365e0460e3ef4bc3b290596a1f66867b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
@@ -31,9 +31,7 @@ public class LitePullConsumerSubscribe {
try {
while (running) {
List messageExts = litePullConsumer.poll();
- if (messageExts != null) {
- System.out.printf("%s%n", messageExts);
- }
+ System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();