diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 87c01a5b3055470be8d1772457afde58d9e55a95..c3e4efa252c1c8fb4e3d9b2d0214e2b5b965d736 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -16,7 +16,9 @@ */ package org.apache.rocketmq.client; +import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.UtilAll; @@ -95,7 +97,6 @@ public class ClientConfig { } } - public String withNamespace(String resource) { return NamespaceUtil.wrapNamespace(this.getNamespace(), resource); } @@ -124,9 +125,21 @@ public class ClientConfig { if (StringUtils.isEmpty(this.getNamespace())) { return queue; } - return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId()); } + + public Collection queuesWithNamespace(Collection queues) { + if (StringUtils.isEmpty(this.getNamespace())) { + return queues; + } + Iterator iter = queues.iterator(); + while (iter.hasNext()) { + MessageQueue queue = iter.next(); + queue.setTopic(withNamespace(queue.getTopic())); + } + return queues; + } + public void resetClientConfig(final ClientConfig cc) { this.namesrvAddr = cc.namesrvAddr; this.clientIP = cc.clientIP; @@ -170,6 +183,7 @@ public class ClientConfig { /** * Domain name mode access way does not support the delimiter(;), and only one domain name can be set. + * * @param namesrvAddr name server address */ public void setNamesrvAddr(String namesrvAddr) { diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index 5d6acc0f872bf645ecc28cc1665ab94c8c34725d..1b96cd058ff129dc01d1d18aff44c0f0a6e8da1c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -53,14 +53,17 @@ public class Validators { if (UtilAll.isBlank(group)) { throw new MQClientException("the specified group is blank", null); } + + if (group.length() > CHARACTER_MAX_LENGTH) { + throw new MQClientException("the specified group is longer than group max length 255.", null); + } + if (!regularExpressionMatcher(group, PATTERN)) { throw new MQClientException(String.format( "the specified group[%s] contains illegal characters, allowing only %s", group, VALID_PATTERN_STR), null); } - if (group.length() > CHARACTER_MAX_LENGTH) { - throw new MQClientException("the specified group is longer than group max length 255.", null); - } + } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..99976d55d3ad90bd5514008b201482ca726458fd --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import java.util.Collection; +import java.util.List; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.RPCHook; + +public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer { + + private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl; + + /** + * Consumers belonging to the same consumer group share a group id. The consumers in a group then + * divides the topic as fairly amongst themselves as possible by establishing that each queue is only + * consumed by a single consumer from the group. If all consumers are from the same group, it functions + * as a traditional message queue. Each message would be consumed by one consumer of the group only. + * When multiple consumer groups exist, the flow of the data consumption model aligns with the traditional + * publish-subscribe model. The messages are broadcast to all consumer groups. + */ + private String consumerGroup; + + /** + * Long polling mode, the Consumer connection max suspend time, it is not recommended to modify + */ + private long brokerSuspendMaxTimeMillis = 1000 * 20; + + /** + * Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not + * recommended to modify + */ + private long consumerTimeoutMillisWhenSuspend = 1000 * 30; + + /** + * The socket timeout in milliseconds + */ + private long consumerPullTimeoutMillis = 1000 * 10; + + /** + * Consumption pattern,default is clustering + */ + private MessageModel messageModel = MessageModel.CLUSTERING; + /** + * Message queue listener + */ + private MessageQueueListener messageQueueListener; + /** + * Offset Storage + */ + private OffsetStore offsetStore; + + /** + * Queue allocation algorithm + */ + private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); + /** + * Whether the unit of subscription group + */ + private boolean unitMode = false; + + /** + * The flag for auto commit offset + */ + private boolean autoCommit = true; + + /** + * Pull thread number + */ + private int pullThreadNums = 20; + + /** + * Maximum commit offset interval time in milliseconds. + */ + private long autoCommitIntervalMillis = 5 * 1000; + + /** + * Maximum number of messages pulled each time. + */ + private int pullBatchSize = 10; + + /** + * Flow control threshold for consume request, each consumer will cache at most 10000 consume requests by default. + * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit + */ + private long pullThresholdForAll = 10000; + + /** + * Consume max span offset. + */ + private int consumeMaxSpan = 2000; + + /** + * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider + * the {@code pullBatchSize}, the instantaneous value may exceed the limit + */ + private int pullThresholdForQueue = 1000; + + /** + * Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default, + * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit + * + *

+ * The size of a message only measured by message body, so it's not accurate + */ + private int pullThresholdSizeForQueue = 100; + + /** + * The poll timeout in milliseconds + */ + private long pollTimeoutMillis = 1000 * 5; + + /** + * Interval time in in milliseconds for checking changes in topic metadata. + */ + private long topicMetadataCheckIntervalMillis = 30 * 1000; + + /** + * Default constructor. + */ + public DefaultLitePullConsumer() { + this(null, MixAll.DEFAULT_CONSUMER_GROUP, null); + } + + /** + * Constructor specifying consumer group. + * + * @param consumerGroup Consumer group. + */ + public DefaultLitePullConsumer(final String consumerGroup) { + this(null, consumerGroup, null); + } + + /** + * Constructor specifying RPC hook. + * + * @param rpcHook RPC hook to execute before each remoting command. + */ + public DefaultLitePullConsumer(RPCHook rpcHook) { + this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); + } + + /** + * Constructor specifying consumer group, RPC hook + * + * @param consumerGroup Consumer group. + * @param rpcHook RPC hook to execute before each remoting command. + */ + public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) { + this(null, consumerGroup, rpcHook); + } + + /** + * Constructor specifying namespace, consumer group and RPC hook. + * + * @param consumerGroup Consumer group. + * @param rpcHook RPC hook to execute before each remoting command. + */ + public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { + this.namespace = namespace; + this.consumerGroup = consumerGroup; + defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); + } + + @Override + public void start() throws MQClientException { + this.defaultLitePullConsumerImpl.start(); + } + + @Override + public void shutdown() { + this.defaultLitePullConsumerImpl.shutdown(); + } + + @Override + public void subscribe(String topic, String subExpression) throws MQClientException { + this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression); + } + + @Override + public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { + this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector); + } + + @Override + public void unsubscribe(String topic) { + this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic)); + } + + @Override + public void assign(Collection messageQueues) { + defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues)); + } + + @Override + public List poll() { + return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis()); + } + + @Override + public List poll(long timeout) { + return defaultLitePullConsumerImpl.poll(timeout); + } + + @Override + public void seek(MessageQueue messageQueue, long offset) throws MQClientException { + this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset); + } + + @Override + public void pause(Collection messageQueues) { + this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues)); + } + + @Override + public void resume(Collection messageQueues) { + this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues)); + } + + @Override + public Collection fetchMessageQueues(String topic) throws MQClientException { + return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic)); + } + + @Override + public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException { + return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp); + } + + @Override + public void registerTopicMessageQueueChangeListener(String topic, + TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException { + this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener); + } + + @Override + public void commitSync() { + this.defaultLitePullConsumerImpl.commitSync(); + } + + @Override + public Long committed(MessageQueue messageQueue) throws MQClientException { + return this.defaultLitePullConsumerImpl.committed(messageQueue); + } + + @Override + public boolean isAutoCommit() { + return autoCommit; + } + + @Override + public void setAutoCommit(boolean autoCommit) { + this.autoCommit = autoCommit; + } + + public int getPullThreadNums() { + return pullThreadNums; + } + + public void setPullThreadNums(int pullThreadNums) { + this.pullThreadNums = pullThreadNums; + } + + public long getAutoCommitIntervalMillis() { + return autoCommitIntervalMillis; + } + + public void setAutoCommitIntervalMillis(long autoCommitIntervalMillis) { + this.autoCommitIntervalMillis = autoCommitIntervalMillis; + } + + public int getPullBatchSize() { + return pullBatchSize; + } + + public void setPullBatchSize(int pullBatchSize) { + this.pullBatchSize = pullBatchSize; + } + + public long getPullThresholdForAll() { + return pullThresholdForAll; + } + + public void setPullThresholdForAll(long pullThresholdForAll) { + this.pullThresholdForAll = pullThresholdForAll; + } + + public int getConsumeMaxSpan() { + return consumeMaxSpan; + } + + public void setConsumeMaxSpan(int consumeMaxSpan) { + this.consumeMaxSpan = consumeMaxSpan; + } + + public int getPullThresholdForQueue() { + return pullThresholdForQueue; + } + + public void setPullThresholdForQueue(int pullThresholdForQueue) { + this.pullThresholdForQueue = pullThresholdForQueue; + } + + public int getPullThresholdSizeForQueue() { + return pullThresholdSizeForQueue; + } + + public void setPullThresholdSizeForQueue(int pullThresholdSizeForQueue) { + this.pullThresholdSizeForQueue = pullThresholdSizeForQueue; + } + + public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { + return allocateMessageQueueStrategy; + } + + public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + } + + public long getBrokerSuspendMaxTimeMillis() { + return brokerSuspendMaxTimeMillis; + } + + public long getPollTimeoutMillis() { + return pollTimeoutMillis; + } + + public void setPollTimeoutMillis(long pollTimeoutMillis) { + this.pollTimeoutMillis = pollTimeoutMillis; + } + + public OffsetStore getOffsetStore() { + return offsetStore; + } + + public void setOffsetStore(OffsetStore offsetStore) { + this.offsetStore = offsetStore; + } + + public boolean isUnitMode() { + return unitMode; + } + + public void setUnitMode(boolean isUnitMode) { + this.unitMode = isUnitMode; + } + + public MessageModel getMessageModel() { + return messageModel; + } + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public MessageQueueListener getMessageQueueListener() { + return messageQueueListener; + } + + public void setMessageQueueListener(MessageQueueListener messageQueueListener) { + this.messageQueueListener = messageQueueListener; + } + + public long getConsumerPullTimeoutMillis() { + return consumerPullTimeoutMillis; + } + + public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) { + this.consumerPullTimeoutMillis = consumerPullTimeoutMillis; + } + + public long getConsumerTimeoutMillisWhenSuspend() { + return consumerTimeoutMillisWhenSuspend; + } + + public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) { + this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend; + } + + public long getTopicMetadataCheckIntervalMillis() { + return topicMetadataCheckIntervalMillis; + } + + public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) { + this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index f3b6caaa71f23861f8bc767e5f64178cef1ad752..0876a94e4c536b9691840373c5daa2a2c6aa9463 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -35,9 +35,13 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; /** - * Default pulling consumer + * Default pulling consumer. + * This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumer} is recommend to use + * in the scenario of actively pulling messages. */ +@Deprecated public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer { + protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl; /** diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..d6e657ff18cf491b5893fae0da127fe90f19aa75 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import java.util.Collection; +import java.util.List; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +public interface LitePullConsumer { + + /** + * Start the consumer + */ + void start() throws MQClientException; + + /** + * Shutdown the consumer + */ + void shutdown(); + + /** + * Subscribe some topic with subExpression + * + * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if + * null or * expression,meaning subscribe all + * @throws MQClientException if there is any client error. + */ + void subscribe(final String topic, final String subExpression) throws MQClientException; + + /** + * Subscribe some topic with selector. + * + * @param selector message selector({@link MessageSelector}), can be null. + * @throws MQClientException if there is any client error. + */ + void subscribe(final String topic, final MessageSelector selector) throws MQClientException; + + /** + * Unsubscribe consumption some topic + * + * @param topic Message topic that needs to be unsubscribe. + */ + void unsubscribe(final String topic); + + /** + * Manually assign a list of message queues to this consumer. This interface does not allow for incremental + * assignment and will replace the previous assignment (if there is one). + * + * @param messageQueues Message queues that needs to be assigned. + */ + void assign(Collection messageQueues); + + /** + * Fetch data for the topics or partitions specified using assign API + * + * @return list of message, can be null. + */ + List poll(); + + /** + * Fetch data for the topics or partitions specified using assign API + * + * @param timeout The amount time, in milliseconds, spent waiting in poll if data is not available. Must not be + * negative + * @return list of message, can be null. + */ + List poll(long timeout); + + /** + * Overrides the fetch offsets that the consumer will use on the next poll. If this API is invoked for the same + * message queue more than once, the latest offset will be used on the next poll(). Note that you may lose data if + * this API is arbitrarily used in the middle of consumption. + * + * @param messageQueue + * @param offset + */ + void seek(MessageQueue messageQueue, long offset) throws MQClientException; + + /** + * Suspend pulling from the requested message queues. + * + * Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop immediately until the + * messages of the requested message queues drain. + * + * Note that this method does not affect message queue subscription. In particular, it does not cause a group + * rebalance. + * + * @param messageQueues Message queues that needs to be paused. + */ + void pause(Collection messageQueues); + + /** + * Resume specified message queues which have been paused with {@link #pause(Collection)}. + * + * @param messageQueues Message queues that needs to be resumed. + */ + void resume(Collection messageQueues); + + /** + * Whether to enable auto-commit consume offset. + * + * @return true if enable auto-commit, false if disable auto-commit. + */ + boolean isAutoCommit(); + + /** + * Set whether to enable auto-commit consume offset. + * + * @param autoCommit Whether to enable auto-commit. + */ + void setAutoCommit(boolean autoCommit); + + /** + * Get metadata about the message queues for a given topic. + * + * @param topic The topic that need to get metadata. + * @return collection of message queues + * @throws MQClientException if there is any client error. + */ + Collection fetchMessageQueues(String topic) throws MQClientException; + + /** + * Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the + * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message + * queue. + * + * @param messageQueue Message queues that needs to get offset by timestamp. + * @param timestamp + * @return offset + * @throws MQClientException if there is any client error. + */ + Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException; + + /** + * Manually commit consume offset. + */ + void commitSync(); + + /** + * Get the last committed offset for the given message queue. + * + * @param messageQueue + * @return offset, if offset equals -1 means no offset in broker. + * @throws MQClientException if there is any client error. + */ + Long committed(MessageQueue messageQueue) throws MQClientException; + + /** + * Register a callback for sensing topic metadata changes. + * + * @param topic The topic that need to monitor. + * @param topicMessageQueueChangeListener Callback when topic metadata changes, refer {@link + * TopicMessageQueueChangeListener} + * @throws MQClientException if there is any client error. + */ + void registerTopicMessageQueueChangeListener(String topic, + TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException; +} diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java index 28b807c2ed89ac460d9cfd0e61a35dc144c42242..a8e96283f9d06c11a29197d1f62f0c494abb5d56 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java @@ -169,4 +169,5 @@ public interface MQPullConsumer extends MQConsumer { */ void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java index 5436688052cc52634782d5f4f344933470159973..4d57313f8fa215b6b0fba0b3239c13b01d7425b3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -32,7 +32,9 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; /** - * Schedule service for pull consumer + * Schedule service for pull consumer. + * This Consumer will be removed in 2022, and a better implementation {@link + * DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages. */ public class MQPullConsumerScheduleService { private final InternalLogger log = ClientLogger.getLog(); @@ -157,7 +159,7 @@ public class MQPullConsumerScheduleService { } } - class PullTaskImpl implements Runnable { + public class PullTaskImpl implements Runnable { private final MessageQueue messageQueue; private volatile boolean cancelled = false; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java new file mode 100644 index 0000000000000000000000000000000000000000..fa6fd134e230118654eac42d68ee5296305c0e06 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/TopicMessageQueueChangeListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import java.util.Set; +import org.apache.rocketmq.common.message.MessageQueue; + +public interface TopicMessageQueueChangeListener { + /** + * This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is + * expanded or shrunk. + * + * @param messageQueues + */ + void onChanged(String topic, Set messageQueues); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index c1524e10777981e25ef03326ef0abbc8727041e8..6b762383717bd01889b355334540f0fca061d6ec 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -117,25 +117,24 @@ public class RemoteBrokerOffsetStore implements OffsetStore { return; final HashSet unusedMQ = new HashSet(); - if (!mqs.isEmpty()) { - for (Map.Entry entry : this.offsetTable.entrySet()) { - MessageQueue mq = entry.getKey(); - AtomicLong offset = entry.getValue(); - if (offset != null) { - if (mqs.contains(mq)) { - try { - this.updateConsumeOffsetToBroker(mq, offset.get()); - log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", - this.groupName, - this.mQClientFactory.getClientId(), - mq, - offset.get()); - } catch (Exception e) { - log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); - } - } else { - unusedMQ.add(mq); + + for (Map.Entry entry : this.offsetTable.entrySet()) { + MessageQueue mq = entry.getKey(); + AtomicLong offset = entry.getValue(); + if (offset != null) { + if (mqs.contains(mq)) { + try { + this.updateConsumeOffsetToBroker(mq, offset.get()); + log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", + this.groupName, + this.mQClientFactory.getClientId(), + mq, + offset.get()); + } catch (Exception e) { + log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } + } else { + unusedMQ.add(mq); } } } @@ -187,8 +186,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } /** - * Update the Consumer Offset in one way, once the Master is off, updated to Slave, - * here need to be optimized. + * Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized. */ private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { @@ -196,15 +194,13 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } /** - * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, - * here need to be optimized. + * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. */ @Override public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..c0c6f6030c2599457aef833f389e62a22fcee296 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.message.MessageQueue; + +public class AssignedMessageQueue { + + private final ConcurrentHashMap assignedMessageQueueState; + + private RebalanceImpl rebalanceImpl; + + public AssignedMessageQueue() { + assignedMessageQueueState = new ConcurrentHashMap(); + } + + public void setRebalanceImpl(RebalanceImpl rebalanceImpl) { + this.rebalanceImpl = rebalanceImpl; + } + + public Set messageQueues() { + return assignedMessageQueueState.keySet(); + } + + public boolean isPaused(MessageQueue messageQueue) { + MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); + if (messageQueueState != null) { + return messageQueueState.isPaused(); + } + return true; + } + + public void pause(Collection messageQueues) { + for (MessageQueue messageQueue : messageQueues) { + MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); + if (assignedMessageQueueState.get(messageQueue) != null) { + messageQueueState.setPaused(true); + } + } + } + + public void resume(Collection messageQueueCollection) { + for (MessageQueue messageQueue : messageQueueCollection) { + MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); + if (assignedMessageQueueState.get(messageQueue) != null) { + messageQueueState.setPaused(false); + } + } + } + + public ProcessQueue getProcessQueue(MessageQueue messageQueue) { + MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); + if (messageQueueState != null) { + return messageQueueState.getProcessQueue(); + } + return null; + } + + public long getPullOffset(MessageQueue messageQueue) { + MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); + if (messageQueueState != null) { + return messageQueueState.getPullOffset(); + } + return -1; + } + + public void updatePullOffset(MessageQueue messageQueue, long offset) { + MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); + if (messageQueueState != null) { + messageQueueState.setPullOffset(offset); + } + } + + public long getConusmerOffset(MessageQueue messageQueue) { + MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); + if (messageQueueState != null) { + return messageQueueState.getConsumeOffset(); + } + return -1; + } + + public void updateConsumeOffset(MessageQueue messageQueue, long offset) { + MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); + if (messageQueueState != null) { + messageQueueState.setConsumeOffset(offset); + } + } + + public void setSeekOffset(MessageQueue messageQueue, long offset) { + MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); + if (messageQueueState != null) { + messageQueueState.setSeekOffset(offset); + } + } + + public long getSeekOffset(MessageQueue messageQueue) { + MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); + if (messageQueueState != null) { + return messageQueueState.getSeekOffset(); + } + return -1; + } + + public void updateAssignedMessageQueue(String topic, Collection assigned) { + synchronized (this.assignedMessageQueueState) { + Iterator> it = this.assignedMessageQueueState.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + if (!assigned.contains(next.getKey())) { + next.getValue().getProcessQueue().setDropped(true); + it.remove(); + } + } + } + addAssignedMessageQueue(assigned); + } + } + + public void updateAssignedMessageQueue(Collection assigned) { + synchronized (this.assignedMessageQueueState) { + Iterator> it = this.assignedMessageQueueState.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + if (!assigned.contains(next.getKey())) { + next.getValue().getProcessQueue().setDropped(true); + it.remove(); + } + } + addAssignedMessageQueue(assigned); + } + } + + private void addAssignedMessageQueue(Collection assigned) { + for (MessageQueue messageQueue : assigned) { + if (!this.assignedMessageQueueState.containsKey(messageQueue)) { + MessageQueueState messageQueueState; + if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) { + messageQueueState = new MessageQueueState(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue)); + } else { + ProcessQueue processQueue = new ProcessQueue(); + messageQueueState = new MessageQueueState(messageQueue, processQueue); + } + this.assignedMessageQueueState.put(messageQueue, messageQueueState); + } + } + } + + public void removeAssignedMessageQueue(String topic) { + synchronized (this.assignedMessageQueueState) { + Iterator> it = this.assignedMessageQueueState.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + it.remove(); + } + } + } + } + + private class MessageQueueState { + private MessageQueue messageQueue; + private ProcessQueue processQueue; + private volatile boolean paused = false; + private volatile long pullOffset = -1; + private volatile long consumeOffset = -1; + private volatile long seekOffset = -1; + + private MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) { + this.messageQueue = messageQueue; + this.processQueue = processQueue; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public void setMessageQueue(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + public boolean isPaused() { + return paused; + } + + public void setPaused(boolean paused) { + this.paused = paused; + } + + public long getPullOffset() { + return pullOffset; + } + + public void setPullOffset(long pullOffset) { + this.pullOffset = pullOffset; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } + + public void setProcessQueue(ProcessQueue processQueue) { + this.processQueue = processQueue; + } + + public long getConsumeOffset() { + return consumeOffset; + } + + public void setConsumeOffset(long consumeOffset) { + this.consumeOffset = consumeOffset; + } + + public long getSeekOffset() { + return seekOffset; + } + + public void setSeekOffset(long seekOffset) { + this.seekOffset = seekOffset; + } + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..5217a315398a2d6965933e8e70a1e453e6405edf --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -0,0 +1,1074 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.consumer.store.ReadOffsetType; +import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.FilterMessageHook; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ServiceState; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.filter.FilterAPI; +import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; + +public class DefaultLitePullConsumerImpl implements MQConsumerInner { + + private final InternalLogger log = ClientLogger.getLog(); + + private final long consumerStartTimestamp = System.currentTimeMillis(); + + private final RPCHook rpcHook; + + private final ArrayList filterMessageHookList = new ArrayList(); + + private volatile ServiceState serviceState = ServiceState.CREATE_JUST; + + protected MQClientInstance mQClientFactory; + + private PullAPIWrapper pullAPIWrapper; + + private OffsetStore offsetStore; + + private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this); + + private enum SubscriptionType { + NONE, SUBSCRIBE, ASSIGN + } + + private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first."; + + private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive."; + /** + * the type of subscription + */ + private SubscriptionType subscriptionType = SubscriptionType.NONE; + /** + * Delay some time when exception occur + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 1000; + /** + * Flow control interval + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; + /** + * Delay some time when suspend pull service + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000; + + private DefaultLitePullConsumer defaultLitePullConsumer; + + private final ConcurrentMap taskTable = + new ConcurrentHashMap(); + + private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue(); + + private final BlockingQueue consumeRequestCache = new LinkedBlockingQueue(); + + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + + private final ScheduledExecutorService scheduledExecutorService; + + private Map topicMessageQueueChangeListenerMap = new HashMap(); + + private Map> messageQueuesForTopic = new HashMap>(); + + private long consumeRequestFlowControlTimes = 0L; + + private long queueFlowControlTimes = 0L; + + private long queueMaxSpanFlowControlTimes = 0L; + + private long nextAutoCommitDeadline = -1L; + + private final MessageQueueLock messageQueueLock = new MessageQueueLock(); + + public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) { + this.defaultLitePullConsumer = defaultLitePullConsumer; + this.rpcHook = rpcHook; + this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( + this.defaultLitePullConsumer.getPullThreadNums(), + new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup()) + ); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "MonitorMessageQueueChangeThread"); + } + }); + } + + private void checkServiceState() { + if (this.serviceState != ServiceState.RUNNING) + throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); + } + + private synchronized void setSubscriptionType(SubscriptionType type) { + if (this.subscriptionType == SubscriptionType.NONE) + this.subscriptionType = type; + else if (this.subscriptionType != type) + throw new IllegalStateException(SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE); + } + + private void updateAssignedMessageQueue(String topic, Set assignedMessageQueue) { + this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue); + } + + private void updatePullTask(String topic, Set mqNewSet) { + Iterator> it = this.taskTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + if (!mqNewSet.contains(next.getKey())) { + next.getValue().setCancelled(true); + it.remove(); + } + } + } + startPullTask(mqNewSet); + } + + class MessageQueueListenerImpl implements MessageQueueListener { + @Override + public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) { + MessageModel messageModel = defaultLitePullConsumer.getMessageModel(); + switch (messageModel) { + case BROADCASTING: + updateAssignedMessageQueue(topic, mqAll); + updatePullTask(topic, mqAll); + break; + case CLUSTERING: + updateAssignedMessageQueue(topic, mqDivided); + updatePullTask(topic, mqDivided); + break; + default: + break; + } + } + } + + private int nextPullBatchSize() { + return Math.min(this.defaultLitePullConsumer.getPullBatchSize(), consumeRequestCache.remainingCapacity()); + } + + public synchronized void shutdown() { + switch (this.serviceState) { + case CREATE_JUST: + break; + case RUNNING: + persistConsumerOffset(); + this.mQClientFactory.unregisterConsumer(this.defaultLitePullConsumer.getConsumerGroup()); + scheduledThreadPoolExecutor.shutdown(); + scheduledExecutorService.shutdown(); + this.mQClientFactory.shutdown(); + this.serviceState = ServiceState.SHUTDOWN_ALREADY; + log.info("the consumer [{}] shutdown OK", this.defaultLitePullConsumer.getConsumerGroup()); + break; + default: + break; + } + } + + public synchronized void start() throws MQClientException { + switch (this.serviceState) { + case CREATE_JUST: + this.serviceState = ServiceState.START_FAILED; + + this.checkConfig(); + + if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) { + this.defaultLitePullConsumer.changeInstanceNameToPID(); + } + + initMQClientFactory(); + + initRebalanceImpl(); + + initPullAPIWrapper(); + + initOffsetStore(); + + mQClientFactory.start(); + + startScheduleTask(); + + this.serviceState = ServiceState.RUNNING; + + log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup()); + + operateAfterRunning(); + + break; + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + + this.serviceState + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + default: + break; + } + } + + private void initMQClientFactory() throws MQClientException { + this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook); + boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this); + if (!registerOK) { + this.serviceState = ServiceState.CREATE_JUST; + + throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup() + + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + null); + } + } + + private void initRebalanceImpl() { + this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup()); + this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel()); + this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()); + this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); + } + + private void initPullAPIWrapper() { + this.pullAPIWrapper = new PullAPIWrapper( + mQClientFactory, + this.defaultLitePullConsumer.getConsumerGroup(), isUnitMode()); + this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); + } + + private void initOffsetStore() throws MQClientException { + if (this.defaultLitePullConsumer.getOffsetStore() != null) { + this.offsetStore = this.defaultLitePullConsumer.getOffsetStore(); + } else { + switch (this.defaultLitePullConsumer.getMessageModel()) { + case BROADCASTING: + this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup()); + break; + case CLUSTERING: + this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup()); + break; + default: + break; + } + this.defaultLitePullConsumer.setOffsetStore(this.offsetStore); + } + this.offsetStore.load(); + } + + private void startScheduleTask() { + scheduledExecutorService.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + try { + fetchTopicMessageQueuesAndCompare(); + } catch (Exception e) { + log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e); + } + } + }, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS); + } + + private void operateAfterRunning() throws MQClientException { + // If subscribe function invoke before start function, then update topic subscribe info after initialization. + if (subscriptionType == SubscriptionType.SUBSCRIBE) { + updateTopicSubscribeInfoWhenSubscriptionChanged(); + } + // If assign function invoke before start function, then update pull task after initialization. + if (subscriptionType == SubscriptionType.ASSIGN) { + updateAssignPullTask(assignedMessageQueue.messageQueues()); + } + + for (String topic : topicMessageQueueChangeListenerMap.keySet()) { + Set messageQueues = fetchMessageQueues(topic); + messageQueuesForTopic.put(topic, messageQueues); + } + this.mQClientFactory.checkClientInBroker(); + } + + private void checkConfig() throws MQClientException { + // Check consumerGroup + Validators.checkGroup(this.defaultLitePullConsumer.getConsumerGroup()); + + // Check consumerGroup name is not equal default consumer group name. + if (this.defaultLitePullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { + throw new MQClientException( + "consumerGroup can not equal " + + MixAll.DEFAULT_CONSUMER_GROUP + + ", please specify another one." + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // Check messageModel is not null. + if (null == this.defaultLitePullConsumer.getMessageModel()) { + throw new MQClientException( + "messageModel is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // Check allocateMessageQueueStrategy is not null + if (null == this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()) { + throw new MQClientException( + "allocateMessageQueueStrategy is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) { + throw new MQClientException( + "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + } + + public PullAPIWrapper getPullAPIWrapper() { + return pullAPIWrapper; + } + + private void startPullTask(Collection mqSet) { + for (MessageQueue messageQueue : mqSet) { + if (!this.taskTable.containsKey(messageQueue)) { + PullTaskImpl pullTask = new PullTaskImpl(messageQueue); + this.taskTable.put(messageQueue, pullTask); + this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS); + } + } + } + + private void updateAssignPullTask(Collection mqNewSet) { + Iterator> it = this.taskTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + if (!mqNewSet.contains(next.getKey())) { + next.getValue().setCancelled(true); + it.remove(); + } + } + + startPullTask(mqNewSet); + } + + private void updateTopicSubscribeInfoWhenSubscriptionChanged() { + Map subTable = rebalanceImpl.getSubscriptionInner(); + if (subTable != null) { + for (final Map.Entry entry : subTable.entrySet()) { + final String topic = entry.getKey(); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + } + } + } + + public synchronized void subscribe(String topic, String subExpression) throws MQClientException { + try { + if (topic == null || topic.equals("")) { + throw new IllegalArgumentException("Topic can not be null or empty."); + } + setSubscriptionType(SubscriptionType.SUBSCRIBE); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(), + topic, subExpression); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); + assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl); + if (serviceState == ServiceState.RUNNING) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + updateTopicSubscribeInfoWhenSubscriptionChanged(); + } + } catch (Exception e) { + throw new MQClientException("subscribe exception", e); + } + } + + public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { + try { + if (topic == null || topic.equals("")) { + throw new IllegalArgumentException("Topic can not be null or empty."); + } + setSubscriptionType(SubscriptionType.SUBSCRIBE); + if (messageSelector == null) { + subscribe(topic, SubscriptionData.SUB_ALL); + return; + } + SubscriptionData subscriptionData = FilterAPI.build(topic, + messageSelector.getExpression(), messageSelector.getExpressionType()); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); + assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl); + if (serviceState == ServiceState.RUNNING) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + updateTopicSubscribeInfoWhenSubscriptionChanged(); + } + } catch (Exception e) { + throw new MQClientException("subscribe exception", e); + } + } + + public synchronized void unsubscribe(final String topic) { + this.rebalanceImpl.getSubscriptionInner().remove(topic); + removePullTaskCallback(topic); + assignedMessageQueue.removeAssignedMessageQueue(topic); + } + + public synchronized void assign(Collection messageQueues) { + if (messageQueues == null || messageQueues.isEmpty()) { + throw new IllegalArgumentException("Message queues can not be null or empty."); + } + setSubscriptionType(SubscriptionType.ASSIGN); + assignedMessageQueue.updateAssignedMessageQueue(messageQueues); + if (serviceState == ServiceState.RUNNING) { + updateAssignPullTask(messageQueues); + } + } + + private void maybeAutoCommit() { + long now = System.currentTimeMillis(); + if (now >= nextAutoCommitDeadline) { + commitAll(); + nextAutoCommitDeadline = now + defaultLitePullConsumer.getAutoCommitIntervalMillis(); + } + } + + public synchronized List poll(long timeout) { + try { + checkServiceState(); + if (timeout < 0) + throw new IllegalArgumentException("Timeout must not be negative"); + + if (defaultLitePullConsumer.isAutoCommit()) { + maybeAutoCommit(); + } + long endTime = System.currentTimeMillis() + timeout; + + ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + + if (endTime - System.currentTimeMillis() > 0) { + while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { + consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (endTime - System.currentTimeMillis() <= 0) + break; + } + } + + if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) { + List messages = consumeRequest.getMessageExts(); + long offset = consumeRequest.getProcessQueue().removeMessage(messages); + assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); + //If namespace not null , reset Topic without namespace. + this.resetTopic(messages); + return messages; + } + } catch (InterruptedException ignore) { + + } + + return Collections.emptyList(); + } + + public void pause(Collection messageQueues) { + assignedMessageQueue.pause(messageQueues); + } + + public void resume(Collection messageQueues) { + assignedMessageQueue.resume(messageQueues); + } + + public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException { + if (!assignedMessageQueue.messageQueues().contains(messageQueue)) { + if (subscriptionType == SubscriptionType.SUBSCRIBE) { + throw new MQClientException("The message queue is not in assigned list, may be rebalancing, message queue: " + messageQueue, null); + } else { + throw new MQClientException("The message queue is not in assigned list, message queue: " + messageQueue, null); + } + } + long minOffset = minOffset(messageQueue); + long maxOffset = maxOffset(messageQueue); + if (offset < minOffset || offset > maxOffset) { + throw new MQClientException("Seek offset illegal, seek offset = " + offset + ", min offset = " + minOffset + ", max offset = " + maxOffset, null); + } + final Object objLock = messageQueueLock.fetchLockObject(messageQueue); + synchronized (objLock) { + assignedMessageQueue.setSeekOffset(messageQueue, offset); + clearMessageQueueInCache(messageQueue); + } + } + + private long maxOffset(MessageQueue messageQueue) throws MQClientException { + checkServiceState(); + return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue); + } + + private long minOffset(MessageQueue messageQueue) throws MQClientException { + checkServiceState(); + return this.mQClientFactory.getMQAdminImpl().minOffset(messageQueue); + } + + private void removePullTaskCallback(final String topic) { + removePullTask(topic); + } + + private void removePullTask(final String topic) { + Iterator> it = this.taskTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + next.getValue().setCancelled(true); + it.remove(); + } + } + } + + public synchronized void commitSync() { + try { + for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { + long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue); + if (consumerOffset != -1) { + ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); + long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); + if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { + updateConsumeOffset(messageQueue, consumerOffset); + updateConsumeOffsetToBroker(messageQueue, consumerOffset, false); + } + } + } + if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { + offsetStore.persistAll(assignedMessageQueue.messageQueues()); + } + } catch (Exception e) { + log.error("An error occurred when update consume offset synchronously.", e); + } + } + + private synchronized void commitAll() { + try { + for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { + long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue); + if (consumerOffset != -1) { + ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); + long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); + if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { + updateConsumeOffset(messageQueue, consumerOffset); + updateConsumeOffsetToBroker(messageQueue, consumerOffset, true); + } + } + } + if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { + offsetStore.persistAll(assignedMessageQueue.messageQueues()); + } + } catch (Exception e) { + log.error("An error occurred when update consume offset Automatically."); + } + } + + private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset) { + if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) { + assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset); + } + } + + private void submitConsumeRequest(ConsumeRequest consumeRequest) { + try { + consumeRequestCache.put(consumeRequest); + } catch (InterruptedException e) { + log.error("Submit consumeRequest error", e); + } + } + + private long fetchConsumeOffset(MessageQueue messageQueue, boolean fromStore) { + checkServiceState(); + return this.offsetStore.readOffset(messageQueue, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE); + } + + public long committed(MessageQueue messageQueue) throws MQClientException { + checkServiceState(); + long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE); + if (offset == -2) + throw new MQClientException("Fetch consume offset from broker exception", null); + return offset; + } + + private void clearMessageQueueInCache(MessageQueue messageQueue) { + ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); + if (processQueue != null) { + processQueue.clear(); + } + Iterator iter = consumeRequestCache.iterator(); + while (iter.hasNext()) { + if (iter.next().getMessageQueue().equals(messageQueue)) + iter.remove(); + } + } + + private long nextPullOffset(MessageQueue messageQueue) { + long offset = -1; + long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue); + if (seekOffset != -1) { + offset = seekOffset; + assignedMessageQueue.updateConsumeOffset(messageQueue, offset); + assignedMessageQueue.setSeekOffset(messageQueue, -1); + } else { + offset = assignedMessageQueue.getPullOffset(messageQueue); + if (offset == -1) { + offset = fetchConsumeOffset(messageQueue, false); + if (offset == -1 && defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { + offset = 0; + } + } + assignedMessageQueue.updateConsumeOffset(messageQueue, offset); + } + return offset; + } + + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + checkServiceState(); + return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + } + + public class PullTaskImpl implements Runnable { + private final MessageQueue messageQueue; + private volatile boolean cancelled = false; + + public PullTaskImpl(final MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public void run() { + + if (!this.isCancelled()) { + + if (assignedMessageQueue.isPaused(messageQueue)) { + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS); + log.debug("Message Queue: {} has been paused!", messageQueue); + return; + } + + ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); + + if (processQueue == null && processQueue.isDropped()) { + log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); + return; + } + + if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((consumeRequestFlowControlTimes++ % 1000) == 0) + log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); + return; + } + + long cachedMessageCount = processQueue.getMsgCount().get(); + long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); + + if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) { + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn( + "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", + defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes); + } + return; + } + + if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) { + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn( + "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", + defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes); + } + return; + } + + if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) { + scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { + log.warn( + "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", + processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes); + } + return; + } + + long offset = nextPullOffset(messageQueue); + long pullDelayTimeMills = 0; + try { + SubscriptionData subscriptionData; + if (subscriptionType == SubscriptionType.SUBSCRIBE) { + String topic = this.messageQueue.getTopic(); + subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); + } else { + String topic = this.messageQueue.getTopic(); + subscriptionData = FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(), + topic, SubscriptionData.SUB_ALL); + } + + PullResult pullResult = pull(messageQueue, subscriptionData, offset, nextPullBatchSize()); + switch (pullResult.getPullStatus()) { + case FOUND: + final Object objLock = messageQueueLock.fetchLockObject(messageQueue); + synchronized (objLock) { + if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) { + processQueue.putMessage(pullResult.getMsgFoundList()); + submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue)); + } + } + break; + case OFFSET_ILLEGAL: + log.warn("The pull request offset illegal, {}", pullResult.toString()); + break; + default: + break; + } + updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); + } catch (Throwable e) { + pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION; + log.error("An error occurred in pull message process.", e); + } + + if (!this.isCancelled()) { + scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS); + } else { + log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue); + } + } + } + + public boolean isCancelled() { + return cancelled; + } + + public void setCancelled(boolean cancelled) { + this.cancelled = cancelled; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + } + + private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return pull(mq, subscriptionData, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis()); + } + + private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout); + } + + private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, + boolean block, + long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + + if (null == mq) { + throw new MQClientException("mq is null", null); + } + + if (offset < 0) { + throw new MQClientException("offset < 0", null); + } + + if (maxNums <= 0) { + throw new MQClientException("maxNums <= 0", null); + } + + int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); + + long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; + + boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); + PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( + mq, + subscriptionData.getSubString(), + subscriptionData.getExpressionType(), + isTagType ? 0L : subscriptionData.getSubVersion(), + offset, + maxNums, + sysFlag, + 0, + this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis(), + timeoutMillis, + CommunicationMode.SYNC, + null + ); + this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); + return pullResult; + } + + private void resetTopic(List msgList) { + if (null == msgList || msgList.size() == 0) { + return; + } + + //If namespace not null , reset Topic without namespace. + for (MessageExt messageExt : msgList) { + if (null != this.defaultLitePullConsumer.getNamespace()) { + messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.defaultLitePullConsumer.getNamespace())); + } + } + + } + + public void updateConsumeOffset(MessageQueue mq, long offset) { + checkServiceState(); + this.offsetStore.updateOffset(mq, offset, false); + } + + @Override + public String groupName() { + return this.defaultLitePullConsumer.getConsumerGroup(); + } + + @Override + public MessageModel messageModel() { + return this.defaultLitePullConsumer.getMessageModel(); + } + + @Override + public ConsumeType consumeType() { + return ConsumeType.CONSUME_ACTIVELY; + } + + @Override + public ConsumeFromWhere consumeFromWhere() { + return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + } + + @Override + public Set subscriptions() { + Set subSet = new HashSet(); + + subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values()); + + return subSet; + } + + @Override + public void doRebalance() { + if (this.rebalanceImpl != null) { + this.rebalanceImpl.doRebalance(false); + } + } + + @Override + public void persistConsumerOffset() { + try { + checkServiceState(); + Set mqs = new HashSet(); + Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); + mqs.addAll(allocateMq); + this.offsetStore.persistAll(mqs); + } catch (Exception e) { + log.error("group: " + this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); + } + } + + @Override + public void updateTopicSubscribeInfo(String topic, Set info) { + Map subTable = this.rebalanceImpl.getSubscriptionInner(); + if (subTable != null) { + if (subTable.containsKey(topic)) { + this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info); + } + } + } + + @Override + public boolean isSubscribeTopicNeedUpdate(String topic) { + Map subTable = this.rebalanceImpl.getSubscriptionInner(); + if (subTable != null) { + if (subTable.containsKey(topic)) { + return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic); + } + } + + return false; + } + + @Override + public boolean isUnitMode() { + return this.defaultLitePullConsumer.isUnitMode(); + } + + @Override + public ConsumerRunningInfo consumerRunningInfo() { + ConsumerRunningInfo info = new ConsumerRunningInfo(); + + Properties prop = MixAll.object2Properties(this.defaultLitePullConsumer); + prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp)); + info.setProperties(prop); + + info.getSubscriptionSet().addAll(this.subscriptions()); + return info; + } + + private void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException { + this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); + } + + public OffsetStore getOffsetStore() { + return offsetStore; + } + + public DefaultLitePullConsumer getDefaultLitePullConsumer() { + return defaultLitePullConsumer; + } + + public Set fetchMessageQueues(String topic) throws MQClientException { + checkServiceState(); + Set result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic); + return parseMessageQueues(result); + } + + private synchronized void fetchTopicMessageQueuesAndCompare() throws MQClientException { + for (Map.Entry entry : topicMessageQueueChangeListenerMap.entrySet()) { + String topic = entry.getKey(); + TopicMessageQueueChangeListener topicMessageQueueChangeListener = entry.getValue(); + Set oldMessageQueues = messageQueuesForTopic.get(topic); + Set newMessageQueues = fetchMessageQueues(topic); + boolean isChanged = !isSetEqual(newMessageQueues, oldMessageQueues); + if (isChanged) { + messageQueuesForTopic.put(topic, newMessageQueues); + if (topicMessageQueueChangeListener != null) { + topicMessageQueueChangeListener.onChanged(topic, newMessageQueues); + } + } + } + } + + private boolean isSetEqual(Set set1, Set set2) { + if (set1 == null && set2 == null) { + return true; + } + + if (set1 == null || set2 == null || set1.size() != set2.size() + || set1.size() == 0 || set2.size() == 0) { + return false; + } + + Iterator iter = set2.iterator(); + boolean isEqual = true; + while (iter.hasNext()) { + if (!set1.contains(iter.next())) { + isEqual = false; + } + } + return isEqual; + } + + public synchronized void registerTopicMessageQueueChangeListener(String topic, + TopicMessageQueueChangeListener listener) throws MQClientException { + if (topic == null || listener == null) { + throw new MQClientException("Topic or listener is null", null); + } + if (topicMessageQueueChangeListenerMap.containsKey(topic)) { + log.warn("Topic {} had been registered, new listener will overwrite the old one", topic); + } + topicMessageQueueChangeListenerMap.put(topic, listener); + if (this.serviceState == ServiceState.RUNNING) { + Set messageQueues = fetchMessageQueues(topic); + messageQueuesForTopic.put(topic, messageQueues); + } + } + + private Set parseMessageQueues(Set queueSet) { + Set resultQueues = new HashSet(); + for (MessageQueue messageQueue : queueSet) { + String userTopic = NamespaceUtil.withoutNamespace(messageQueue.getTopic(), + this.defaultLitePullConsumer.getNamespace()); + resultQueues.add(new MessageQueue(userTopic, messageQueue.getBrokerName(), messageQueue.getQueueId())); + } + return resultQueues; + } + + public class ConsumeRequest { + private final List messageExts; + private final MessageQueue messageQueue; + private final ProcessQueue processQueue; + + public ConsumeRequest(final List messageExts, final MessageQueue messageQueue, + final ProcessQueue processQueue) { + this.messageExts = messageExts; + this.messageQueue = messageQueue; + this.processQueue = processQueue; + } + + public List getMessageExts() { + return messageExts; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } + + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 598dc94cc26a5984345e4cf62b6dbd53028263c4..c12d8357eae3bc365266a72a476a01be0efa3dd0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -66,6 +66,11 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; +/** + * This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumerImpl} is recommend to use + * in the scenario of actively pulling messages. + */ +@Deprecated public class DefaultMQPullConsumerImpl implements MQConsumerInner { private final InternalLogger log = ClientLogger.getLog(); private final DefaultMQPullConsumer defaultMQPullConsumer; @@ -74,7 +79,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { private final ArrayList consumeMessageHookList = new ArrayList(); private final ArrayList filterMessageHookList = new ArrayList(); private volatile ServiceState serviceState = ServiceState.CREATE_JUST; - private MQClientInstance mQClientFactory; + protected MQClientInstance mQClientFactory; private PullAPIWrapper pullAPIWrapper; private OffsetStore offsetStore; private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index 0a52817ced7d9350c9a38dfc3fe358c193307364..092da9aa33e1997639ed311e822397bec7c1d053 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.logging.InternalLogger; @@ -431,4 +432,5 @@ public class ProcessQueue { public void setLastConsumeTimestamp(long lastConsumeTimestamp) { this.lastConsumeTimestamp = lastConsumeTimestamp; } + } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 9ad07c7e4433548d64693c294e2b1bf58ea0e530..146fce6e1e3b3c7c311074efdca9d718a5a261f3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -41,8 +41,10 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; /** - * Base class for rebalance algorithm + * This class will be removed in 2022, and a better implementation {@link RebalanceLitePullImpl} is recommend to use + * in the scenario of actively pulling messages. */ +@Deprecated public abstract class RebalanceImpl { protected static final InternalLogger log = ClientLogger.getLog(); protected final ConcurrentMap processQueueTable = new ConcurrentHashMap(64); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..0b8ec67778f2d86034c3573a4b69de417f89c539 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +import java.util.List; +import java.util.Set; + +public class RebalanceLitePullImpl extends RebalanceImpl { + + private final DefaultLitePullConsumerImpl litePullConsumerImpl; + + public RebalanceLitePullImpl(DefaultLitePullConsumerImpl litePullConsumerImpl) { + this(null, null, null, null, litePullConsumerImpl); + } + + public RebalanceLitePullImpl(String consumerGroup, MessageModel messageModel, + AllocateMessageQueueStrategy allocateMessageQueueStrategy, + MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) { + super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); + this.litePullConsumerImpl = litePullConsumerImpl; + } + + @Override + public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) { + MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener(); + if (messageQueueListener != null) { + try { + messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); + } catch (Throwable e) { + log.error("messageQueueChanged exception", e); + } + } + } + + @Override + public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { + this.litePullConsumerImpl.getOffsetStore().persist(mq); + this.litePullConsumerImpl.getOffsetStore().removeOffset(mq); + return true; + } + + @Override + public ConsumeType consumeType() { + return ConsumeType.CONSUME_ACTIVELY; + } + + @Override + public void removeDirtyOffset(final MessageQueue mq) { + this.litePullConsumerImpl.getOffsetStore().removeOffset(mq); + } + + @Override + public long computePullFromWhere(MessageQueue mq) { + return 0; + } + + @Override + public void dispatchPullRequest(List pullRequestList) { + } + +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 2a4fb7dfa6cedc44079425b5c455b85db56e9e1c..4f9d42cc57d1e8a131d238ffb7cc229641c3c30c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -246,10 +246,6 @@ public class MQClientInstance { log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; - case RUNNING: - break; - case SHUTDOWN_ALREADY: - break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..7d496acf0200113996598a139a0063f481027550 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.consumer; + +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.consumer.store.ReadOffsetType; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.FindBrokerResult; +import org.apache.rocketmq.client.impl.MQAdminImpl; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; +import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; +import org.apache.rocketmq.client.impl.consumer.RebalanceService; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageClientExt; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultLitePullConsumerTest { + @Spy + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + + @Mock + private MQClientAPIImpl mQClientAPIImpl; + @Mock + private MQAdminImpl mQAdminImpl; + + private RebalanceImpl rebalanceImpl; + private OffsetStore offsetStore; + private DefaultLitePullConsumerImpl litePullConsumerImpl; + private String consumerGroup = "LitePullConsumerGroup"; + private String topic = "LitePullConsumerTest"; + private String brokerName = "BrokerA"; + private boolean flag = false; + + @Before + public void init() throws Exception { + Field field = MQClientInstance.class.getDeclaredField("rebalanceService"); + field.setAccessible(true); + RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory); + field = RebalanceService.class.getDeclaredField("waitInterval"); + field.setAccessible(true); + field.set(rebalanceService, 100); + } + + @Test + public void testAssign_PollMessageSuccess() throws Exception { + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + List result = litePullConsumer.poll(); + assertThat(result.get(0).getTopic()).isEqualTo(topic); + assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testSubscribe_PollMessageSuccess() throws Exception { + DefaultLitePullConsumer litePullConsumer = createSubscribeLitePullConsumer(); + try { + Set messageQueueSet = new HashSet(); + messageQueueSet.add(createMessageQueue()); + litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet); + litePullConsumer.setPollTimeoutMillis(20 * 1000); + List result = litePullConsumer.poll(); + assertThat(result.get(0).getTopic()).isEqualTo(topic); + assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testSubscribe_BroadcastPollMessageSuccess() throws Exception { + DefaultLitePullConsumer litePullConsumer = createBroadcastLitePullConsumer(); + try { + Set messageQueueSet = new HashSet(); + messageQueueSet.add(createMessageQueue()); + litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet); + litePullConsumer.setPollTimeoutMillis(20 * 1000); + List result = litePullConsumer.poll(); + assertThat(result.get(0).getTopic()).isEqualTo(topic); + assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testSubscriptionType_AssignAndSubscribeExclusive() throws Exception { + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); + try { + litePullConsumer.subscribe(topic, "*"); + litePullConsumer.assign(Collections.singletonList(createMessageQueue())); + failBecauseExceptionWasNotThrown(IllegalStateException.class); + } catch (IllegalStateException e) { + assertThat(e).hasMessageContaining("Subscribe and assign are mutually exclusive."); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testFetchMessageQueues_FetchMessageQueuesBeforeStart() throws Exception { + DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer(); + try { + litePullConsumer.fetchMessageQueues(topic); + failBecauseExceptionWasNotThrown(IllegalStateException.class); + } catch (IllegalStateException e) { + assertThat(e).hasMessageContaining("The consumer not running, please start it first."); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testSeek_SeekOffsetSuccess() throws Exception { + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); + when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L); + when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L); + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + long offset = litePullConsumer.committed(messageQueue); + litePullConsumer.seek(messageQueue, offset); + Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue"); + field.setAccessible(true); + AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(litePullConsumerImpl); + assertEquals(assignedMessageQueue.getSeekOffset(messageQueue), offset); + litePullConsumer.shutdown(); + } + + @Test + public void testSeek_SeekOffsetIllegal() throws Exception { + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); + when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L); + when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L); + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + try { + litePullConsumer.seek(messageQueue, -1); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("min offset = 0"); + } + + try { + litePullConsumer.seek(messageQueue, 1000); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("max offset = 100"); + } + litePullConsumer.shutdown(); + } + + @Test + public void testSeek_MessageQueueNotInAssignList() throws Exception { + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); + try { + litePullConsumer.seek(createMessageQueue(), 0); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("The message queue is not in assigned list"); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = createSubscribeLitePullConsumer(); + try { + litePullConsumer.seek(createMessageQueue(), 0); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("The message queue is not in assigned list, may be rebalancing"); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testOffsetForTimestamp_FailedAndSuccess() throws Exception { + MessageQueue messageQueue = createMessageQueue(); + DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer(); + try { + litePullConsumer.offsetForTimestamp(messageQueue, 123456L); + failBecauseExceptionWasNotThrown(IllegalStateException.class); + } catch (IllegalStateException e) { + assertThat(e).hasMessageContaining("The consumer not running, please start it first."); + } finally { + litePullConsumer.shutdown(); + } + doReturn(123L).when(mQAdminImpl).searchOffset(any(MessageQueue.class), anyLong()); + litePullConsumer = createStartLitePullConsumer(); + long offset = litePullConsumer.offsetForTimestamp(messageQueue, 123456L); + assertThat(offset).isEqualTo(123L); + } + + @Test + public void testPauseAndResume_Success() throws Exception { + DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + litePullConsumer.pause(Collections.singletonList(messageQueue)); + litePullConsumer.start(); + initDefaultLitePullConsumer(litePullConsumer); + List result = litePullConsumer.poll(); + assertThat(result.isEmpty()).isTrue(); + litePullConsumer.resume(Collections.singletonList(messageQueue)); + result = litePullConsumer.poll(); + assertThat(result.get(0).getTopic()).isEqualTo(topic); + assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testRegisterTopicMessageQueueChangeListener_Success() throws Exception { + flag = false; + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); + doReturn(Collections.emptySet()).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString()); + litePullConsumer.setTopicMetadataCheckIntervalMillis(10); + litePullConsumer.registerTopicMessageQueueChangeListener(topic, new TopicMessageQueueChangeListener() { + @Override public void onChanged(String topic, Set messageQueues) { + flag = true; + } + }); + Set set = new HashSet(); + set.add(createMessageQueue()); + doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString()); + Thread.sleep(11 * 1000); + assertThat(flag).isTrue(); + } + + @Test + public void testFlowControl_Success() throws Exception { + DefaultLitePullConsumer litePullConsumer = createStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.setPullThresholdForAll(-1); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + litePullConsumer.setPollTimeoutMillis(500); + List result = litePullConsumer.poll(); + assertThat(result).isEmpty(); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = createStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.setPullThresholdForQueue(-1); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + litePullConsumer.setPollTimeoutMillis(500); + List result = litePullConsumer.poll(); + assertThat(result).isEmpty(); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = createStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.setPullThresholdSizeForQueue(-1); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + litePullConsumer.setPollTimeoutMillis(500); + List result = litePullConsumer.poll(); + assertThat(result).isEmpty(); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = createStartLitePullConsumer(); + try { + MessageQueue messageQueue = createMessageQueue(); + litePullConsumer.setConsumeMaxSpan(-1); + litePullConsumer.assign(Collections.singletonList(messageQueue)); + litePullConsumer.setPollTimeoutMillis(500); + List result = litePullConsumer.poll(); + assertThat(result).isEmpty(); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testCheckConfig_Exception() { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(MixAll.DEFAULT_CONSUMER_GROUP); + try { + litePullConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("consumerGroup can not equal"); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setMessageModel(null); + try { + litePullConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("messageModel is null"); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setAllocateMessageQueueStrategy(null); + try { + litePullConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("allocateMessageQueueStrategy is null"); + } finally { + litePullConsumer.shutdown(); + } + + litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setConsumerTimeoutMillisWhenSuspend(1); + try { + litePullConsumer.start(); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"); + } finally { + litePullConsumer.shutdown(); + } + + } + + private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception { + + Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); + field.setAccessible(true); + litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(litePullConsumerImpl, mQClientFactory); + + PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper(); + field = PullAPIWrapper.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(pullAPIWrapper, mQClientFactory); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQClientAPIImpl); + + field = MQClientInstance.class.getDeclaredField("mQAdminImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQAdminImpl); + + field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl"); + field.setAccessible(true); + rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl); + field = RebalanceImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(rebalanceImpl, mQClientFactory); + + offsetStore = spy(litePullConsumerImpl.getOffsetStore()); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore"); + field.setAccessible(true); + field.set(litePullConsumerImpl, offsetStore); + + when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + PullMessageRequestHeader requestHeader = mock.getArgument(1); + MessageClientExt messageClientExt = new MessageClientExt(); + messageClientExt.setTopic(topic); + messageClientExt.setQueueId(0); + messageClientExt.setMsgId("123"); + messageClientExt.setBody(new byte[] {'a'}); + messageClientExt.setOffsetMsgId("234"); + messageClientExt.setBornHost(new InetSocketAddress(8080)); + messageClientExt.setStoreHost(new InetSocketAddress(8080)); + PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); + return pullResult; + } + }); + + when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); + + doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString()); + + doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class)); + } + + private DefaultLitePullConsumer createSubscribeLitePullConsumer() throws Exception { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); + litePullConsumer.subscribe(topic, "*"); + litePullConsumer.start(); + initDefaultLitePullConsumer(litePullConsumer); + return litePullConsumer; + } + + private DefaultLitePullConsumer createStartLitePullConsumer() throws Exception { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); + litePullConsumer.start(); + initDefaultLitePullConsumer(litePullConsumer); + return litePullConsumer; + } + + private DefaultLitePullConsumer createNotStartLitePullConsumer() { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + return litePullConsumer; + } + + private DefaultLitePullConsumer createBroadcastLitePullConsumer() throws Exception { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); + litePullConsumer.setMessageModel(MessageModel.BROADCASTING); + litePullConsumer.subscribe(topic, "*"); + litePullConsumer.start(); + initDefaultLitePullConsumer(litePullConsumer); + return litePullConsumer; + } + + private MessageQueue createMessageQueue() { + MessageQueue messageQueue = new MessageQueue(); + messageQueue.setBrokerName(brokerName); + messageQueue.setQueueId(0); + messageQueue.setTopic(topic); + return messageQueue; + } + + private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, + List messageExtList) throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + for (MessageExt messageExt : messageExtList) { + outputStream.write(MessageDecoder.encode(messageExt, false)); + } + return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java index fb1f9bbde7110fe0b9a500e771cf70a8fe4f0c95..28e02341c69b6ea96a16d08cb22b0b06480c8a82 100644 --- a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java @@ -50,4 +50,4 @@ public class PushConsumer { consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); } -} +} \ No newline at end of file diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java new file mode 100644 index 0000000000000000000000000000000000000000..e638de1c960ba9fd37a9f16515161d2aa3afe712 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAssign.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +public class LitePullConsumerAssign { + + public static volatile boolean running = true; + + public static void main(String[] args) throws Exception { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); + litePullConsumer.setAutoCommit(false); + litePullConsumer.start(); + Collection mqSet = litePullConsumer.fetchMessageQueues("TopicTest"); + List list = new ArrayList<>(mqSet); + List assignList = new ArrayList<>(); + for (int i = 0; i < list.size() / 2; i++) { + assignList.add(list.get(i)); + } + litePullConsumer.assign(assignList); + litePullConsumer.seek(assignList.get(0), 10); + try { + while (running) { + List messageExts = litePullConsumer.poll(); + System.out.printf("%s %n", messageExts); + litePullConsumer.commitSync(); + } + } finally { + litePullConsumer.shutdown(); + } + + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java new file mode 100644 index 0000000000000000000000000000000000000000..1bfe49d7365e0460e3ef4bc3b290596a1f66867b --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import java.util.List; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.common.message.MessageExt; + +public class LitePullConsumerSubscribe { + + public static volatile boolean running = true; + + public static void main(String[] args) throws Exception { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); + litePullConsumer.subscribe("TopicTest", "*"); + litePullConsumer.start(); + try { + while (running) { + List messageExts = litePullConsumer.poll(); + System.out.printf("%s%n", messageExts); + } + } finally { + litePullConsumer.shutdown(); + } + } +}