diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 91e9f887b6dd1b510f5afdafb1ad478d9806e5cb..dc08ac12adebc2d434285b563c3141907387b518 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -59,7 +59,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; * Thread Safety: After initialization, the instance can be regarded as thread-safe. *

*/ -public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { +public class DefaultMQPushConsumer extends ClientConfig implements MQRealPushConsumer { private final InternalLogger log = ClientLogger.getLog(); @@ -223,7 +223,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Whether update subscription relationship when every pull */ - private boolean postSubscriptionWhenPull = true; + private boolean postSubscriptionWhenPull = false; /** * Whether the unit of subscription group @@ -275,24 +275,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } - /** - * Constructor specifying consumer group, RPC hook and message queue allocating algorithm. - * - * @param consumerGroup Consume queue. - * @param rpcHook RPC hook to execute before each remoting command. - * @param allocateMessageQueueStrategy Message queue allocating algorithm. - */ - public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, - AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) { - this.consumerGroup = consumerGroup; - if (allocateMessageQueueStrategy == null) { - this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); - } else { - this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; - } - defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel); - } - /** * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and * customized trace topic name. diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQRealPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQRealPushConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..b4ca6f7ea079c168fd83ae97203b1ac998f720c5 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQRealPushConsumer.java @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.DefaultMQRealPushConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; + +/** + * In most scenarios, this is the mostly recommended class to consume messages. + *

+ * + * Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on + * arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages. + *

+ * + * See quickstart/Consumer in the example module for a typical usage. + *

+ * + *

+ * Thread Safety: After initialization, the instance can be regarded as thread-safe. + *

+ */ +public class DefaultMQRealPushConsumer extends ClientConfig implements MQRealPushConsumer { + + private final InternalLogger log = ClientLogger.getLog(); + + /** + * Internal implementation. Most of the functions herein are delegated to it. + */ + protected final transient DefaultMQRealPushConsumerImpl defaultMQPushConsumerImpl; + + /** + * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve + * load balance. It's required and needs to be globally unique. + *

+ * + * See here for further discussion. + */ + private String consumerGroup; + + /** + * Message model defines the way how messages are delivered to each consumer clients. + *

+ * + * RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with + * the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load + * balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages + * separately. + *

+ * + * This field defaults to clustering. + */ + private MessageModel messageModel = MessageModel.CLUSTERING; + + /** + * Consuming point on consumer booting. + *

+ * + * There are three consuming points: + * + */ + private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + + /** + * Backtracking consumption time with second precision. Time format is 20131223171201
Implying Seventeen twelve + * and 01 seconds on December 23, 2013 year
Default backtracking consumption time Half an hour ago. + */ + private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)); + + /** + * Queue allocation algorithm specifying how message queues are allocated to each consumer clients. + */ + private AllocateMessageQueueStrategy allocateMessageQueueStrategy; + + /** + * Subscription relationship + */ + private Map subscription = new HashMap(); + + /** + * Message listener + */ + private MessageListener messageListener; + + /** + * Offset Storage + */ + private OffsetStore offsetStore; + + /** + * Minimum consumer thread number + */ + private int consumeThreadMin = 20; + + /** + * Max consumer thread number + */ + private int consumeThreadMax = 64; + + /** + * Threshold for dynamic adjustment of the number of thread pool + */ + private long adjustThreadPoolNumsThreshold = 100000; + + /** + * Concurrently max span offset.it has no effect on sequential consumption + */ + private int consumeConcurrentlyMaxSpan = 2000; + + /** + * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider + * the {@code pullBatchSize}, the instantaneous value may exceed the limit + */ + private int pullThresholdForQueue = 1000; + + /** + * Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default, + * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit + * + *

+ * The size of a message only measured by message body, so it's not accurate + */ + private int pullThresholdSizeForQueue = 100; + + /** + * Flow control threshold on topic level, default value is -1(Unlimited) + *

+ * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on {@code + * pullThresholdForTopic} if it is't unlimited + *

+ * For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer, + * then pullThresholdForQueue will be set to 100 + */ + private int pullThresholdForTopic = -1; + + /** + * Limit the cached message size on topic level, default value is -1 MiB(Unlimited) + *

+ * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on {@code + * pullThresholdSizeForTopic} if it is't unlimited + *

+ * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this + * consumer, then pullThresholdSizeForQueue will be set to 100 MiB + */ + private int pullThresholdSizeForTopic = -1; + + /** + * Message pull Interval + */ + private long pullInterval = 0; + + /** + * Batch consumption size + */ + private int consumeMessageBatchMaxSize = 1; + + /** + * Batch pull size + */ + private int pullBatchSize = 32; + + /** + * Whether update subscription relationship when every pull + */ + private boolean postSubscriptionWhenPull = true; + + /** + * Whether the unit of subscription group + */ + private boolean unitMode = false; + + /** + * Max re-consume times. -1 means 16 times. + *

+ * + * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion + * queue waiting. + */ + private int maxReconsumeTimes = -1; + + /** + * Suspending pulling time for cases requiring slow pulling like flow-control scenario. + */ + private long suspendCurrentQueueTimeMillis = 1000; + + /** + * Maximum amount of time in minutes a message may block the consuming thread. + */ + private long consumeTimeout = 15; + + /** + * Interface of asynchronous transfer data + */ + private TraceDispatcher traceDispatcher = null; + + /** + * Default constructor. + */ + public DefaultMQRealPushConsumer() { + this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); + } + + /** + * Constructor specifying consumer group, RPC hook and message queue allocating algorithm. + * + * @param consumerGroup Consume queue. + * @param rpcHook RPC hook to execute before each remoting command. + * @param allocateMessageQueueStrategy Message queue allocating algorithm. + */ + public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this.consumerGroup = consumerGroup; + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook); + } + + /** + * Constructor specifying consumer group, RPC hook and message queue allocating algorithm. + * + * @param consumerGroup Consume queue. + * @param rpcHook RPC hook to execute before each remoting command. + * @param allocateMessageQueueStrategy Message queue allocating algorithm. + */ + public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) { + this.consumerGroup = consumerGroup; + if (allocateMessageQueueStrategy == null) { + this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); + } else { + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + } + defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook, realPushModel); + } + + /** + * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and + * customized trace topic name. + * + * @param consumerGroup Consume queue. + * @param rpcHook RPC hook to execute before each remoting command. + * @param allocateMessageQueueStrategy message queue allocating algorithm. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. + */ + public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, + final String customizedTraceTopic) { + this.consumerGroup = consumerGroup; + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook); + if (enableMsgTrace) { + try { + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); + dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); + traceDispatcher = dispatcher; + this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( + new ConsumeMessageTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } + } + + /** + * Constructor specifying RPC hook. + * + * @param rpcHook RPC hook to execute before each remoting command. + */ + public DefaultMQRealPushConsumer(RPCHook rpcHook) { + this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); + } + + /** + * Constructor specifying consumer group and enabled msg trace flag. + * + * @param consumerGroup Consumer group. + * @param enableMsgTrace Switch flag instance for message trace. + */ + public DefaultMQRealPushConsumer(final String consumerGroup, boolean enableMsgTrace) { + this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null); + } + + /** + * Constructor specifying consumer group, enabled msg trace flag and customized trace topic name. + * + * @param consumerGroup Consumer group. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. + */ + public DefaultMQRealPushConsumer(final String consumerGroup, boolean enableMsgTrace, + final String customizedTraceTopic) { + this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic); + } + + /** + * Constructor specifying consumer group. + * + * @param consumerGroup Consumer group. + */ + public DefaultMQRealPushConsumer(final String consumerGroup) { + this(consumerGroup, null, new AllocateMessageQueueAveragely()); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); + } + + @Override + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp); + } + + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return this.defaultMQPushConsumerImpl.maxOffset(mq); + } + + @Override + public long minOffset(MessageQueue mq) throws MQClientException { + return this.defaultMQPushConsumerImpl.minOffset(mq); + } + + @Override + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq); + } + + @Override + public MessageExt viewMessage( + String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId); + } + + @Override + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end); + } + + @Override + public MessageExt viewMessage(String topic, + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + try { + MessageDecoder.decodeMessageId(msgId); + return this.viewMessage(msgId); + } catch (Exception e) { + // Ignore + } + return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, msgId); + } + + public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { + return allocateMessageQueueStrategy; + } + + public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + } + + public int getConsumeConcurrentlyMaxSpan() { + return consumeConcurrentlyMaxSpan; + } + + public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) { + this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan; + } + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + this.consumeFromWhere = consumeFromWhere; + } + + public int getConsumeMessageBatchMaxSize() { + return consumeMessageBatchMaxSize; + } + + public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) { + this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public int getConsumeThreadMax() { + return consumeThreadMax; + } + + public void setConsumeThreadMax(int consumeThreadMax) { + this.consumeThreadMax = consumeThreadMax; + } + + public int getConsumeThreadMin() { + return consumeThreadMin; + } + + public void setConsumeThreadMin(int consumeThreadMin) { + this.consumeThreadMin = consumeThreadMin; + } + + public MQPushConsumerInner getDefaultMQPushConsumerImpl() { + return defaultMQPushConsumerImpl; + } + + public MessageListener getMessageListener() { + return messageListener; + } + + public void setMessageListener(MessageListener messageListener) { + this.messageListener = messageListener; + } + + public MessageModel getMessageModel() { + return messageModel; + } + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + public int getPullBatchSize() { + return pullBatchSize; + } + + public void setPullBatchSize(int pullBatchSize) { + this.pullBatchSize = pullBatchSize; + } + + public long getPullInterval() { + return pullInterval; + } + + public void setPullInterval(long pullInterval) { + this.pullInterval = pullInterval; + } + + public int getPullThresholdForQueue() { + return pullThresholdForQueue; + } + + public void setPullThresholdForQueue(int pullThresholdForQueue) { + this.pullThresholdForQueue = pullThresholdForQueue; + } + + public int getPullThresholdForTopic() { + return pullThresholdForTopic; + } + + public void setPullThresholdForTopic(final int pullThresholdForTopic) { + this.pullThresholdForTopic = pullThresholdForTopic; + } + + public int getPullThresholdSizeForQueue() { + return pullThresholdSizeForQueue; + } + + public void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue) { + this.pullThresholdSizeForQueue = pullThresholdSizeForQueue; + } + + public int getPullThresholdSizeForTopic() { + return pullThresholdSizeForTopic; + } + + public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) { + this.pullThresholdSizeForTopic = pullThresholdSizeForTopic; + } + + public Map getSubscription() { + return subscription; + } + + public void setSubscription(Map subscription) { + this.subscription = subscription; + } + + /** + * Send message back to broker which will be re-delivered in future. + * + * @param msg Message to send back. + * @param delayLevel delay level. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws InterruptedException if the thread is interrupted. + * @throws MQClientException if there is any client error. + */ + @Override + public void sendMessageBack(MessageExt msg, int delayLevel) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null); + } + + /** + * Send message back to the broker whose name is brokerName and the message will be re-delivered in + * future. + * + * @param msg Message to send back. + * @param delayLevel delay level. + * @param brokerName broker name. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws InterruptedException if the thread is interrupted. + * @throws MQClientException if there is any client error. + */ + @Override + public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName); + } + + @Override + public Set fetchSubscribeMessageQueues(String topic) throws MQClientException { + return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic); + } + + /** + * This method gets internal infrastructure readily to serve. Instances must call this method after configuration. + * + * @throws MQClientException if there is any client error. + */ + @Override + public void start() throws MQClientException { + this.defaultMQPushConsumerImpl.start(); + if (null != traceDispatcher) { + try { + traceDispatcher.start(this.getNamesrvAddr()); + } catch (MQClientException e) { + log.warn("trace dispatcher start failed ", e); + } + } + } + + /** + * Shut down this client and releasing underlying resources. + */ + @Override + public void shutdown() { + this.defaultMQPushConsumerImpl.shutdown(); + if (null != traceDispatcher) { + traceDispatcher.shutdown(); + } + } + + @Override + @Deprecated + public void registerMessageListener(MessageListener messageListener) { + this.messageListener = messageListener; + this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); + } + + /** + * Register a callback to execute on message arrival for concurrent consuming. + * + * @param messageListener message handling callback. + */ + @Override + public void registerMessageListener(MessageListenerConcurrently messageListener) { + this.messageListener = messageListener; + this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); + } + + /** + * Register a callback to execute on message arrival for orderly consuming. + * + * @param messageListener message handling callback. + */ + @Override + public void registerMessageListener(MessageListenerOrderly messageListener) { + this.messageListener = messageListener; + this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); + } + + /** + * Subscribe a topic to consuming subscription. + * + * @param topic topic to subscribe. + * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if + * null or * expression,meaning subscribe all + * @throws MQClientException if there is any client error. + */ + @Override + public void subscribe(String topic, String subExpression) throws MQClientException { + this.defaultMQPushConsumerImpl.subscribe(topic, subExpression); + } + + /** + * Subscribe a topic to consuming subscription. + * + * @param topic topic to consume. + * @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter + * @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety + */ + @Override + public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { + this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource); + } + + /** + * Subscribe a topic by message selector. + * + * @param topic topic to consume. + * @param messageSelector {@link MessageSelector} + * @see MessageSelector#bySql + * @see MessageSelector#byTag + */ + @Override + public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException { + this.defaultMQPushConsumerImpl.subscribe(topic, messageSelector); + } + + /** + * Un-subscribe the specified topic from subscription. + * + * @param topic message topic + */ + @Override + public void unsubscribe(String topic) { + this.defaultMQPushConsumerImpl.unsubscribe(topic); + } + + /** + * Update the message consuming thread core pool size. + * + * @param corePoolSize new core pool size. + */ + @Override + public void updateCorePoolSize(int corePoolSize) { + this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize); + } + + /** + * Suspend pulling new messages. + */ + @Override + public void suspend() { + this.defaultMQPushConsumerImpl.suspend(); + } + + /** + * Resume pulling. + */ + @Override + public void resume() { + this.defaultMQPushConsumerImpl.resume(); + } + + public OffsetStore getOffsetStore() { + return offsetStore; + } + + public void setOffsetStore(OffsetStore offsetStore) { + this.offsetStore = offsetStore; + } + + public String getConsumeTimestamp() { + return consumeTimestamp; + } + + public void setConsumeTimestamp(String consumeTimestamp) { + this.consumeTimestamp = consumeTimestamp; + } + + public boolean isPostSubscriptionWhenPull() { + return postSubscriptionWhenPull; + } + + public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) { + this.postSubscriptionWhenPull = postSubscriptionWhenPull; + } + + public boolean isUnitMode() { + return unitMode; + } + + public void setUnitMode(boolean isUnitMode) { + this.unitMode = isUnitMode; + } + + public long getAdjustThreadPoolNumsThreshold() { + return adjustThreadPoolNumsThreshold; + } + + public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) { + this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold; + } + + public int getMaxReconsumeTimes() { + return maxReconsumeTimes; + } + + public void setMaxReconsumeTimes(final int maxReconsumeTimes) { + this.maxReconsumeTimes = maxReconsumeTimes; + } + + public long getSuspendCurrentQueueTimeMillis() { + return suspendCurrentQueueTimeMillis; + } + + public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) { + this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; + } + + public long getConsumeTimeout() { + return consumeTimeout; + } + + public void setConsumeTimeout(final long consumeTimeout) { + this.consumeTimeout = consumeTimeout; + } + + public TraceDispatcher getTraceDispatcher() { + return traceDispatcher; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java index bc6d328c430e8f2fb183e7c7b51a33e9d614e30b..e60085fefae5dec8a9a8ede4c0cb8dc2e64445e4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java @@ -20,6 +20,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListener; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner; /** * Push consumer @@ -49,14 +50,13 @@ public interface MQPushConsumer extends MQConsumer { * Subscribe some topic * * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if - * null or * expression,meaning subscribe - * all + * null or * expression,meaning subscribe all */ void subscribe(final String topic, final String subExpression) throws MQClientException; /** - * This method will be removed in the version 5.0.0,because filterServer was removed,and method subscribe(final String topic, final MessageSelector messageSelector) - * is recommended. + * This method will be removed in the version 5.0.0,because filterServer was removed,and method + * subscribe(final String topic, final MessageSelector messageSelector) is recommended. * * Subscribe some topic * @@ -70,8 +70,8 @@ public interface MQPushConsumer extends MQConsumer { /** * Subscribe some topic with selector. *

- * This interface also has the ability of {@link #subscribe(String, String)}, - * and, support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}. + * This interface also has the ability of {@link #subscribe(String, String)}, and, support other message selection, + * such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}. *

*

*

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQRealPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQRealPushConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..80e4e346153fec28f8baf21b9e08d40eff5ecead --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQRealPushConsumer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +/** + * Real Push consumer + */ +public interface MQRealPushConsumer extends MQPushConsumer { + + MQPushConsumerInner getDefaultMQPushConsumerImpl(); + + long getConsumeTimeout(); + + int getPullThresholdForTopic(); + + int getPullThresholdForQueue(); + + void setPullThresholdForQueue(int pullThresholdForQueue); + + int getPullThresholdSizeForTopic(); + + void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic); + + int getPullThresholdSizeForQueue(); + + void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue); + + ConsumeFromWhere getConsumeFromWhere(); + + String getConsumeTimestamp(); + + String getConsumerGroup(); + + int getConsumeThreadMin(); + + void setConsumeThreadMin(int consumeThreadMin); + + int getConsumeThreadMax(); + + long getSuspendCurrentQueueTimeMillis(); + + int getMaxReconsumeTimes(); + + void setMaxReconsumeTimes(final int maxReconsumeTimes); + + int getConsumeMessageBatchMaxSize(); + + MessageModel getMessageModel(); + +} \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index 58b985f10111e66a6cb4a3b14a5830162a0f0fe8..c6c3f2daed146706cd758aeffa61b4a63624a481 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MQRealPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; @@ -50,8 +51,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { private static final InternalLogger log = ClientLogger.getLog(); - private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; - private final DefaultMQPushConsumer defaultMQPushConsumer; + private final MQPushConsumerInner defaultMQPushConsumerImpl; + private final MQRealPushConsumer defaultMQPushConsumer; private final MessageListenerConcurrently messageListener; private final BlockingQueue consumeRequestQueue; private final ThreadPoolExecutor consumeExecutor; @@ -60,7 +61,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService cleanExpireMsgExecutors; - public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, + public ConsumeMessageConcurrentlyService(MQPushConsumerInner defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index d4a9953e783f06e16b1d07a5f7b18bea8dbfaafe..e1717f83ce057fe9041904b1526c2fb52d99b344 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MQRealPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; @@ -52,8 +53,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { private static final InternalLogger log = ClientLogger.getLog(); private final static long MAX_TIME_CONSUME_CONTINUOUSLY = Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000")); - private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; - private final DefaultMQPushConsumer defaultMQPushConsumer; + private final MQPushConsumerInner defaultMQPushConsumerImpl; + private final MQRealPushConsumer defaultMQPushConsumer; private final MessageListenerOrderly messageListener; private final BlockingQueue consumeRequestQueue; private final ThreadPoolExecutor consumeExecutor; @@ -62,7 +63,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { private final ScheduledExecutorService scheduledExecutorService; private volatile boolean stopped = false; - public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, + public ConsumeMessageOrderlyService(MQPushConsumerInner defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index ff5f418a9d33525228fc017832f76e2bea6d8b70..6108530997e66f429fa231d839099281b3325070 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -26,10 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -76,9 +73,10 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; -public class DefaultMQPushConsumerImpl implements MQConsumerInner { +public class DefaultMQPushConsumerImpl implements MQPushConsumerInner { /** * Delay some time when exception occur */ @@ -95,8 +93,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; private final InternalLogger log = ClientLogger.getLog(); private final DefaultMQPushConsumer defaultMQPushConsumer; - //private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); - private final RebalanceImpl rebalanceImpl; + private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); private final ArrayList filterMessageHookList = new ArrayList(); private final long consumerStartTimestamp = System.currentTimeMillis(); private final ArrayList consumeMessageHookList = new ArrayList(); @@ -111,26 +108,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private ConsumeMessageService consumeMessageService; private long queueFlowControlTimes = 0; private long queueMaxSpanFlowControlTimes = 0; - private boolean realPushModel = true; - private final ConcurrentHashMap localConsumerOffset = new ConcurrentHashMap(); - private final ConcurrentHashMap pullStopped = new ConcurrentHashMap(); - private final ConcurrentHashMap processQueues = new ConcurrentHashMap(); public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { - this(defaultMQPushConsumer, rpcHook, true); - } - - public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook, - boolean realPushModel) { this.defaultMQPushConsumer = defaultMQPushConsumer; this.rpcHook = rpcHook; - this.realPushModel = realPushModel; - if (realPushModel) { - log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup()); - rebalanceImpl = new RebalanceRealPushImpl(this); - } else { - rebalanceImpl = new RebalancePushImpl(this); - } } public void registerFilterMessageHook(final FilterMessageHook hook) { @@ -307,17 +288,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { - //Update local offset according remote offset - String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(), - pullRequest.getMessageQueue().getTopic(), - pullRequest.getMessageQueue().getBrokerName(), - pullRequest.getMessageQueue().getQueueId()); - AtomicLong localOffset = localConsumerOffset.get(localOffsetKey); - if (localOffset == null) { - localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1)); - } - localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset()); - pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); @@ -475,15 +445,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { - String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(), - pullRequest.getMessageQueue().getTopic(), - pullRequest.getMessageQueue().getBrokerName(), - pullRequest.getMessageQueue().getQueueId()); - if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) { - //Stop pull request - log.info("Stop pull request, {}", localOffsetKey); - return; - } this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay); } @@ -500,15 +461,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } public void executePullRequestImmediately(final PullRequest pullRequest) { - String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(), - pullRequest.getMessageQueue().getTopic(), - pullRequest.getMessageQueue().getBrokerName(), - pullRequest.getMessageQueue().getQueueId()); - if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) { - //Stop pull request - log.info("Stop pull request, {}", localOffsetKey); - return; - } this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); } @@ -545,8 +497,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { - String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, snodeAddr, msg, + String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) + : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(null, brokerAddr,msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); @@ -675,10 +628,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { default: break; } - this.tryToFindSnodePublishInfo(); + this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); - this.mQClientFactory.sendHeartbeatToAllSnodeWithLock(); + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); } @@ -896,8 +849,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); - this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup()); - } } } @@ -1020,11 +971,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { @Override public ConsumeType consumeType() { - if (realPushModel) { - return ConsumeType.CONSUME_PUSH; - } else { - return ConsumeType.CONSUME_PASSIVELY; - } + return ConsumeType.CONSUME_PASSIVELY; } @Override @@ -1193,90 +1140,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } - private void tryToFindSnodePublishInfo() { - this.mQClientFactory.updateSnodeInfoFromNameServer(); - } - - public boolean processPushMessage(final MessageExt msg, - final String consumerGroup, - final String topic, - final String brokerName, - final int queueID, - final long offset) { - String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID); - AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey); - if (localOffset == null) { - log.info("Current Local offset have not set, initiallized to -1."); - this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1)); - return false; - } - if (localOffset.get() + 1 < offset) { - //should start pull message process - log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); - return false; - } else { - //Stop pull request - log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); - AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); - if (pullStop == null) { - this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); - log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey); - } - pullStop = this.pullStopped.get(localOffsetKey); - if (!pullStop.get()) { - pullStop.set(true); - log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey); - } - //update local offset - localOffset.set(offset); - //submit to process queue - List messageExtList = new ArrayList(); - messageExtList.add(msg); - ProcessQueue processQueue = processQueues.get(localOffsetKey); - if (processQueue == null) { - processQueues.put(localOffsetKey, new ProcessQueue()); - processQueue = processQueues.get(localOffsetKey); - } - processQueue.putMessage(messageExtList); - MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID); - this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true); - } - return true; - } - - private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) { - return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID; - } - public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) { - String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID); - ProcessQueue processQueue = processQueues.get(localOffsetKey); - if (processQueue != null) { - log.info("Clear local expire message for {} in processQueue.", localOffsetKey); - processQueue.cleanExpiredMsg(this.defaultMQPushConsumer); - } - AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); - if (pullStop != null) { - if (pullStop.get()) { - pullStop.set(false); - log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey); - } - } - return true; - } - - public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) { - String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID); - AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); - if (pullStop == null) { - this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); - log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey); - return true; - } - if (!pullStop.get()) { - pullStop.set(true); - log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey); - } return true; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..785e5b800a657c818d4cfe6edf8cfde124c45da1 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java @@ -0,0 +1,1281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer; +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.MQRealPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.consumer.store.ReadOffsetType; +import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.client.hook.FilterMessageHook; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.stat.ConsumerStatsManager; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ServiceState; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.filter.FilterAPI; +import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.ConsumeStatus; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; +import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; + +public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { + /** + * Delay some time when exception occur + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000; + /** + * Flow control interval + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; + /** + * Delay some time when suspend pull service + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000; + private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15; + private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; + private final InternalLogger log = ClientLogger.getLog(); + private final DefaultMQRealPushConsumer defaultMQPushConsumer; + //private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); + private final RebalanceImpl rebalanceImpl; + private final ArrayList filterMessageHookList = new ArrayList(); + private final long consumerStartTimestamp = System.currentTimeMillis(); + private final ArrayList consumeMessageHookList = new ArrayList(); + private final RPCHook rpcHook; + private volatile ServiceState serviceState = ServiceState.CREATE_JUST; + private MQClientInstance mQClientFactory; + private PullAPIWrapper pullAPIWrapper; + private volatile boolean pause = false; + private boolean consumeOrderly = false; + private MessageListener messageListenerInner; + private OffsetStore offsetStore; + private ConsumeMessageService consumeMessageService; + private long queueFlowControlTimes = 0; + private long queueMaxSpanFlowControlTimes = 0; + private boolean realPushModel = true; + private final ConcurrentHashMap localConsumerOffset = new ConcurrentHashMap(); + private final ConcurrentHashMap pullStopped = new ConcurrentHashMap(); + private final ConcurrentHashMap processQueues = new ConcurrentHashMap(); + + public DefaultMQRealPushConsumerImpl(DefaultMQRealPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { + this(defaultMQPushConsumer, rpcHook, true); + } + + public DefaultMQRealPushConsumerImpl(DefaultMQRealPushConsumer defaultMQPushConsumer, RPCHook rpcHook, + boolean realPushModel) { + this.defaultMQPushConsumer = defaultMQPushConsumer; + this.rpcHook = rpcHook; + this.realPushModel = realPushModel; + log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup()); + rebalanceImpl = new RebalanceRealPushImpl(this); + } + + public void registerFilterMessageHook(final FilterMessageHook hook) { + this.filterMessageHookList.add(hook); + log.info("register FilterMessageHook Hook, {}", hook.hookName()); + } + + public boolean hasHook() { + return !this.consumeMessageHookList.isEmpty(); + } + + public void registerConsumeMessageHook(final ConsumeMessageHook hook) { + this.consumeMessageHookList.add(hook); + log.info("register consumeMessageHook Hook, {}", hook.hookName()); + } + + public void executeHookBefore(final ConsumeMessageContext context) { + if (!this.consumeMessageHookList.isEmpty()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageBefore(context); + } catch (Throwable e) { + } + } + } + } + + public void executeHookAfter(final ConsumeMessageContext context) { + if (!this.consumeMessageHookList.isEmpty()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageAfter(context); + } catch (Throwable e) { + } + } + } + } + + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); + } + + public Set fetchSubscribeMessageQueues(String topic) throws MQClientException { + Set result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic); + if (null == result) { + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic); + } + + if (null == result) { + throw new MQClientException("The topic[" + topic + "] not exist", null); + } + + return result; + } + + public MQRealPushConsumer getDefaultMQPushConsumer() { + return defaultMQPushConsumer; + } + + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); + } + + public long maxOffset(MessageQueue mq) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); + } + + public long minOffset(MessageQueue mq) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().minOffset(mq); + } + + public OffsetStore getOffsetStore() { + return offsetStore; + } + + public void setOffsetStore(OffsetStore offsetStore) { + this.offsetStore = offsetStore; + } + + public void pullMessage(final PullRequest pullRequest) { + final ProcessQueue processQueue = pullRequest.getProcessQueue(); + if (processQueue.isDropped()) { + log.info("the pull request[{}] is dropped.", pullRequest.toString()); + return; + } + + pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); + + try { + this.makeSureStateOK(); + } catch (MQClientException e) { + log.warn("pullMessage exception, consumer state not ok", e); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + return; + } + + if (this.isPause()) { + log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); + return; + } + + long cachedMessageCount = processQueue.getMsgCount().get(); + long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); + + if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn( + "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", + this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); + } + return; + } + + if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn( + "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", + this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); + } + return; + } + + if (!this.consumeOrderly) { + if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { + log.warn( + "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", + processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), + pullRequest, queueMaxSpanFlowControlTimes); + } + return; + } + } else { + if (processQueue.isLocked()) { + if (!pullRequest.isLockedFirst()) { + final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); + boolean brokerBusy = offset < pullRequest.getNextOffset(); + log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", + pullRequest, offset, brokerBusy); + if (brokerBusy) { + log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", + pullRequest, offset); + } + + pullRequest.setLockedFirst(true); + pullRequest.setNextOffset(offset); + } + } else { + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + log.info("pull message later because not locked in broker, {}", pullRequest); + return; + } + } + + final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); + if (null == subscriptionData) { + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + log.warn("find the consumer's subscription failed, {}", pullRequest); + return; + } + + final long beginTimestamp = System.currentTimeMillis(); + + PullCallback pullCallback = new PullCallback() { + @Override + public void onSuccess(PullResult pullResult) { + if (pullResult != null) { + //Update local offset according remote offset + String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), + pullRequest.getMessageQueue().getBrokerName(), + pullRequest.getMessageQueue().getQueueId()); + AtomicLong localOffset = localConsumerOffset.get(localOffsetKey); + if (localOffset == null) { + localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1)); + } + localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset()); + + pullResult = DefaultMQRealPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, + subscriptionData); + + switch (pullResult.getPullStatus()) { + case FOUND: + long prevRequestOffset = pullRequest.getNextOffset(); + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + long pullRT = System.currentTimeMillis() - beginTimestamp; + DefaultMQRealPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), pullRT); + + long firstMsgOffset = Long.MAX_VALUE; + if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { + DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + } else { + firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); + + DefaultMQRealPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); + + boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); + DefaultMQRealPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( + pullResult.getMsgFoundList(), + processQueue, + pullRequest.getMessageQueue(), + dispatchToConsume); + + if (DefaultMQRealPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { + DefaultMQRealPushConsumerImpl.this.executePullRequestLater(pullRequest, + DefaultMQRealPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); + } else { + DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + } + } + + if (pullResult.getNextBeginOffset() < prevRequestOffset + || firstMsgOffset < prevRequestOffset) { + log.warn( + "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", + pullResult.getNextBeginOffset(), + firstMsgOffset, + prevRequestOffset); + } + + break; + case NO_NEW_MSG: + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + + DefaultMQRealPushConsumerImpl.this.correctTagsOffset(pullRequest); + + DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + break; + case NO_MATCHED_MSG: + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + + DefaultMQRealPushConsumerImpl.this.correctTagsOffset(pullRequest); + + DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + break; + case OFFSET_ILLEGAL: + log.warn("the pull request offset illegal, {} {}", + pullRequest.toString(), pullResult.toString()); + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + + pullRequest.getProcessQueue().setDropped(true); + DefaultMQRealPushConsumerImpl.this.executeTaskLater(new Runnable() { + + @Override + public void run() { + try { + DefaultMQRealPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), + pullRequest.getNextOffset(), false); + + DefaultMQRealPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); + + DefaultMQRealPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); + + log.warn("fix the pull request offset, {}", pullRequest); + } catch (Throwable e) { + log.error("executeTaskLater Exception", e); + } + } + }, 10000); + break; + default: + break; + } + } + } + + @Override + public void onException(Throwable e) { + if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + log.warn("execute the pull request exception", e); + } + + DefaultMQRealPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + } + }; + + boolean commitOffsetEnable = false; + long commitOffsetValue = 0L; + if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { + commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); + if (commitOffsetValue > 0) { + commitOffsetEnable = true; + } + } + + String subExpression = null; + boolean classFilter = false; + SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); + if (sd != null) { + if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { + subExpression = sd.getSubString(); + } + + classFilter = sd.isClassFilterMode(); + } + + int sysFlag = PullSysFlag.buildSysFlag( + commitOffsetEnable, // commitOffset + true, // suspend + subExpression != null, // subscription + classFilter // class filter + ); + try { + this.pullAPIWrapper.pullKernelImpl( + pullRequest.getMessageQueue(), + subExpression, + subscriptionData.getExpressionType(), + subscriptionData.getSubVersion(), + pullRequest.getNextOffset(), + this.defaultMQPushConsumer.getPullBatchSize(), + sysFlag, + commitOffsetValue, + BROKER_SUSPEND_MAX_TIME_MILLIS, + CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, + CommunicationMode.ASYNC, + pullCallback + ); + } catch (Exception e) { + log.error("pullKernelImpl exception", e); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + } + } + + private void makeSureStateOK() throws MQClientException { + if (this.serviceState != ServiceState.RUNNING) { + throw new MQClientException("The consumer service state not OK, " + + this.serviceState + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + } + } + + private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { + String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), + pullRequest.getMessageQueue().getBrokerName(), + pullRequest.getMessageQueue().getQueueId()); + if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) { + //Stop pull request + log.info("Stop pull request, {}", localOffsetKey); + return; + } + this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay); + } + + public boolean isPause() { + return pause; + } + + public void setPause(boolean pause) { + this.pause = pause; + } + + public ConsumerStatsManager getConsumerStatsManager() { + return this.mQClientFactory.getConsumerStatsManager(); + } + + public void executePullRequestImmediately(final PullRequest pullRequest) { + String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), + pullRequest.getMessageQueue().getBrokerName(), + pullRequest.getMessageQueue().getQueueId()); + if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) { + //Stop pull request + log.info("Stop pull request, {}", localOffsetKey); + return; + } + this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); + } + + private void correctTagsOffset(final PullRequest pullRequest) { + if (0L == pullRequest.getProcessQueue().getMsgCount().get()) { + this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true); + } + } + + public void executeTaskLater(final Runnable r, final long timeDelay) { + this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay); + } + + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); + } + + public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException, + InterruptedException { + return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); + } + + public void registerMessageListener(MessageListener messageListener) { + this.messageListenerInner = messageListener; + } + + public void resume() { + this.pause = false; + doRebalance(); + log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup()); + } + + public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + try { + String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, snodeAddr, msg, + this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); + } catch (Exception e) { + log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); + + Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); + + String originMsgId = MessageAccessor.getOriginMessageId(msg); + MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); + + newMsg.setFlag(msg.getFlag()); + MessageAccessor.setProperties(newMsg, msg.getProperties()); + MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); + MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); + MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); + newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); + + this.mQClientFactory.getDefaultMQProducer().send(newMsg); + } + } + + private int getMaxReconsumeTimes() { + // default reconsume times: 16 + if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) { + return 16; + } else { + return this.defaultMQPushConsumer.getMaxReconsumeTimes(); + } + } + + public synchronized void shutdown() { + switch (this.serviceState) { + case CREATE_JUST: + break; + case RUNNING: + this.consumeMessageService.shutdown(); + this.persistConsumerOffset(); + this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup()); + this.mQClientFactory.shutdown(); + log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup()); + this.rebalanceImpl.destroy(); + this.serviceState = ServiceState.SHUTDOWN_ALREADY; + break; + case SHUTDOWN_ALREADY: + break; + default: + break; + } + } + + public synchronized void start() throws MQClientException { + switch (this.serviceState) { + case CREATE_JUST: + log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), + this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); + this.serviceState = ServiceState.START_FAILED; + + this.checkConfig(); + + this.copySubscription(); + + if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { + this.defaultMQPushConsumer.changeInstanceNameToPID(); + } + + this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); + + this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); + this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); + this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); + this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); + + this.pullAPIWrapper = new PullAPIWrapper( + mQClientFactory, + this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); + this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); + + if (this.defaultMQPushConsumer.getOffsetStore() != null) { + this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); + } else { + switch (this.defaultMQPushConsumer.getMessageModel()) { + case BROADCASTING: + this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); + break; + case CLUSTERING: + this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); + break; + default: + break; + } + this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); + } + this.offsetStore.load(); + + if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { + this.consumeOrderly = true; + this.consumeMessageService = + new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); + } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { + this.consumeOrderly = false; + this.consumeMessageService = + new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); + } + + this.consumeMessageService.start(); + + boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); + if (!registerOK) { + this.serviceState = ServiceState.CREATE_JUST; + this.consumeMessageService.shutdown(); + throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + null); + } + + mQClientFactory.start(); + log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); + this.serviceState = ServiceState.RUNNING; + break; + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + + this.serviceState + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + default: + break; + } + this.tryToFindSnodePublishInfo(); + this.updateTopicSubscribeInfoWhenSubscriptionChanged(); + this.mQClientFactory.checkClientInBroker(); + this.mQClientFactory.sendHeartbeatToAllSnodeWithLock(); + this.mQClientFactory.rebalanceImmediately(); + } + + private void checkConfig() throws MQClientException { + Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup()); + + if (null == this.defaultMQPushConsumer.getConsumerGroup()) { + throw new MQClientException( + "consumerGroup is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { + throw new MQClientException( + "consumerGroup can not equal " + + MixAll.DEFAULT_CONSUMER_GROUP + + ", please specify another one." + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + if (null == this.defaultMQPushConsumer.getMessageModel()) { + throw new MQClientException( + "messageModel is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) { + throw new MQClientException( + "consumeFromWhere is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS); + if (null == dt) { + throw new MQClientException( + "consumeTimestamp is invalid, the valid format is yyyyMMddHHmmss,but received " + + this.defaultMQPushConsumer.getConsumeTimestamp() + + " " + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); + } + + // allocateMessageQueueStrategy + if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) { + throw new MQClientException( + "allocateMessageQueueStrategy is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // subscription + if (null == this.defaultMQPushConsumer.getSubscription()) { + throw new MQClientException( + "subscription is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // messageListener + if (null == this.defaultMQPushConsumer.getMessageListener()) { + throw new MQClientException( + "messageListener is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly; + boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently; + if (!orderly && !concurrently) { + throw new MQClientException( + "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // consumeThreadMin + if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 + || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) { + throw new MQClientException( + "consumeThreadMin Out of range [1, 1000]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // consumeThreadMax + if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) { + throw new MQClientException( + "consumeThreadMax Out of range [1, 1000]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // consumeThreadMin can't be larger than consumeThreadMax + if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) { + throw new MQClientException( + "consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") " + + "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")", + null); + } + + // consumeConcurrentlyMaxSpan + if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1 + || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) { + throw new MQClientException( + "consumeConcurrentlyMaxSpan Out of range [1, 65535]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // pullThresholdForQueue + if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) { + throw new MQClientException( + "pullThresholdForQueue Out of range [1, 65535]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // pullThresholdForTopic + if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) { + if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) { + throw new MQClientException( + "pullThresholdForTopic Out of range [1, 6553500]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + } + + // pullThresholdSizeForQueue + if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) { + throw new MQClientException( + "pullThresholdSizeForQueue Out of range [1, 1024]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) { + // pullThresholdSizeForTopic + if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) { + throw new MQClientException( + "pullThresholdSizeForTopic Out of range [1, 102400]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + } + + // pullInterval + if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) { + throw new MQClientException( + "pullInterval Out of range [0, 65535]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // consumeMessageBatchMaxSize + if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1 + || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) { + throw new MQClientException( + "consumeMessageBatchMaxSize Out of range [1, 1024]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // pullBatchSize + if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) { + throw new MQClientException( + "pullBatchSize Out of range [1, 1024]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + } + + private void copySubscription() throws MQClientException { + try { + Map sub = this.defaultMQPushConsumer.getSubscription(); + if (sub != null) { + for (final Entry entry : sub.entrySet()) { + final String topic = entry.getKey(); + final String subString = entry.getValue(); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), + topic, subString); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + } + } + + if (null == this.messageListenerInner) { + this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); + } + + switch (this.defaultMQPushConsumer.getMessageModel()) { + case BROADCASTING: + break; + case CLUSTERING: + final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), + retryTopic, SubscriptionData.SUB_ALL); + this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); + break; + default: + break; + } + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } + } + + public MessageListener getMessageListenerInner() { + return messageListenerInner; + } + + private void updateTopicSubscribeInfoWhenSubscriptionChanged() { + Map subTable = this.getSubscriptionInner(); + if (subTable != null) { + for (final Entry entry : subTable.entrySet()) { + final String topic = entry.getKey(); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup()); + + } + } + } + + public ConcurrentMap getSubscriptionInner() { + return this.rebalanceImpl.getSubscriptionInner(); + } + + public void subscribe(String topic, String subExpression) throws MQClientException { + try { + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), + topic, subExpression); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + if (this.mQClientFactory != null) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } + } + + public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { + try { + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), + topic, "*"); + subscriptionData.setSubString(fullClassName); + subscriptionData.setClassFilterMode(true); + subscriptionData.setFilterClassSource(filterClassSource); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + if (this.mQClientFactory != null) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } + + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } + } + + public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException { + try { + if (messageSelector == null) { + subscribe(topic, SubscriptionData.SUB_ALL); + return; + } + + SubscriptionData subscriptionData = FilterAPI.build(topic, + messageSelector.getExpression(), messageSelector.getExpressionType()); + + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + if (this.mQClientFactory != null) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } + } + + public void suspend() { + this.pause = true; + log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup()); + } + + public void unsubscribe(String topic) { + this.rebalanceImpl.getSubscriptionInner().remove(topic); + } + + public void updateConsumeOffset(MessageQueue mq, long offset) { + this.offsetStore.updateOffset(mq, offset, false); + } + + public void updateCorePoolSize(int corePoolSize) { + this.consumeMessageService.updateCorePoolSize(corePoolSize); + } + + public MessageExt viewMessage(String msgId) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); + } + + public RebalanceImpl getRebalanceImpl() { + return rebalanceImpl; + } + + public boolean isConsumeOrderly() { + return consumeOrderly; + } + + public void setConsumeOrderly(boolean consumeOrderly) { + this.consumeOrderly = consumeOrderly; + } + + public void resetOffsetByTimeStamp(long timeStamp) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) { + Set mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic); + Map offsetTable = new HashMap(); + if (mqs != null) { + for (MessageQueue mq : mqs) { + long offset = searchOffset(mq, timeStamp); + offsetTable.put(mq, offset); + } + this.mQClientFactory.resetOffset(topic, groupName(), offsetTable); + } + } + } + + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + } + + @Override + public String groupName() { + return this.defaultMQPushConsumer.getConsumerGroup(); + } + + @Override + public MessageModel messageModel() { + return this.defaultMQPushConsumer.getMessageModel(); + } + + @Override + public ConsumeType consumeType() { + if (realPushModel) { + return ConsumeType.CONSUME_PUSH; + } else { + return ConsumeType.CONSUME_PASSIVELY; + } + } + + @Override + public ConsumeFromWhere consumeFromWhere() { + return this.defaultMQPushConsumer.getConsumeFromWhere(); + } + + @Override + public Set subscriptions() { + Set subSet = new HashSet(); + + subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values()); + + return subSet; + } + + @Override + public void doRebalance() { + if (!this.pause) { + this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); + } + } + + @Override + public void persistConsumerOffset() { + try { + this.makeSureStateOK(); + Set mqs = new HashSet(); + Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); + mqs.addAll(allocateMq); + + this.offsetStore.persistAll(mqs); + } catch (Exception e) { + log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); + } + } + + @Override + public void updateTopicSubscribeInfo(String topic, Set info) { + Map subTable = this.getSubscriptionInner(); + if (subTable != null) { + if (subTable.containsKey(topic)) { + this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info); + } + } + } + + @Override + public boolean isSubscribeTopicNeedUpdate(String topic) { + Map subTable = this.getSubscriptionInner(); + if (subTable != null) { + if (subTable.containsKey(topic)) { + return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic); + } + } + + return false; + } + + @Override + public boolean isUnitMode() { + return this.defaultMQPushConsumer.isUnitMode(); + } + + @Override + public ConsumerRunningInfo consumerRunningInfo() { + ConsumerRunningInfo info = new ConsumerRunningInfo(); + + Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer); + + prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly)); + prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, String.valueOf(this.consumeMessageService.getCorePoolSize())); + prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp)); + + info.setProperties(prop); + + Set subSet = this.subscriptions(); + info.getSubscriptionSet().addAll(subSet); + + Iterator> it = this.rebalanceImpl.getProcessQueueTable().entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + MessageQueue mq = next.getKey(); + ProcessQueue pq = next.getValue(); + + ProcessQueueInfo pqinfo = new ProcessQueueInfo(); + pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE)); + pq.fillProcessQueueInfo(pqinfo); + info.getMqTable().put(mq, pqinfo); + } + + for (SubscriptionData sd : subSet) { + ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic()); + info.getStatusTable().put(sd.getTopic(), consumeStatus); + } + + return info; + } + + public MQClientInstance getmQClientFactory() { + return mQClientFactory; + } + + public void setmQClientFactory(MQClientInstance mQClientFactory) { + this.mQClientFactory = mQClientFactory; + } + + public ServiceState getServiceState() { + return serviceState; + } + + //Don't use this deprecated setter, which will be removed soon. + @Deprecated + public synchronized void setServiceState(ServiceState serviceState) { + this.serviceState = serviceState; + } + + public void adjustThreadPool() { + long computeAccTotal = this.computeAccumulationTotal(); + long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold(); + + long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0); + + long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8); + + if (computeAccTotal >= incThreshold) { + this.consumeMessageService.incCorePoolSize(); + } + + if (computeAccTotal < decThreshold) { + this.consumeMessageService.decCorePoolSize(); + } + } + + private long computeAccumulationTotal() { + long msgAccTotal = 0; + ConcurrentMap processQueueTable = this.rebalanceImpl.getProcessQueueTable(); + Iterator> it = processQueueTable.entrySet().iterator(); + while (it.hasNext()) { + Entry next = it.next(); + ProcessQueue value = next.getValue(); + msgAccTotal += value.getMsgAccCnt(); + } + + return msgAccTotal; + } + + public List queryConsumeTimeSpan(final String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + List queueTimeSpan = new ArrayList(); + TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000); + for (BrokerData brokerData : routeData.getBrokerDatas()) { + String addr = brokerData.selectBrokerAddr(); + queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, groupName(), 3000)); + } + + return queueTimeSpan; + } + + public ConsumeMessageService getConsumeMessageService() { + return consumeMessageService; + } + + public void setConsumeMessageService(ConsumeMessageService consumeMessageService) { + this.consumeMessageService = consumeMessageService; + + } + + private void tryToFindSnodePublishInfo() { + this.mQClientFactory.updateSnodeInfoFromNameServer(); + } + + public boolean processPushMessage(final MessageExt msg, + final String consumerGroup, + final String topic, + final String brokerName, + final int queueID, + final long offset) { + String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID); + AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey); + if (localOffset == null) { + log.info("Current Local offset have not set, initiallized to -1."); + this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1)); + return false; + } + if (localOffset.get() + 1 < offset) { + //should start pull message process + log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); + return false; + } else { + //Stop pull request + log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); + AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); + if (pullStop == null) { + this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); + log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey); + } + pullStop = this.pullStopped.get(localOffsetKey); + if (!pullStop.get()) { + pullStop.set(true); + log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey); + } + //update local offset + localOffset.set(offset); + //submit to process queue + List messageExtList = new ArrayList(); + messageExtList.add(msg); + ProcessQueue processQueue = processQueues.get(localOffsetKey); + if (processQueue == null) { + processQueues.put(localOffsetKey, new ProcessQueue()); + processQueue = processQueues.get(localOffsetKey); + } + processQueue.putMessage(messageExtList); + MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID); + this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true); + } + return true; + } + + private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) { + return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID; + } + + public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) { + String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID); + ProcessQueue processQueue = processQueues.get(localOffsetKey); + if (processQueue != null) { + log.info("Clear local expire message for {} in processQueue.", localOffsetKey); + processQueue.cleanExpiredMsg(this.defaultMQPushConsumer); + } + AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); + if (pullStop != null) { + if (pullStop.get()) { + pullStop.set(false); + log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey); + } + } + return true; + } + + public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) { + String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID); + AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); + if (pullStop == null) { + this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); + log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey); + return true; + } + if (!pullStop.get()) { + pullStop.set(true); + log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey); + } + return true; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQPushConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQPushConsumerInner.java new file mode 100644 index 0000000000000000000000000000000000000000..d29667df425aa9c8982db1bc7e634837014d53e4 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQPushConsumerInner.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import org.apache.rocketmq.client.consumer.MQRealPushConsumer; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.stat.ConsumerStatsManager; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.exception.RemotingException; + +/** + * Push Consumer inner interface + */ +public interface MQPushConsumerInner extends MQConsumerInner { + + MQRealPushConsumer getDefaultMQPushConsumer(); + + OffsetStore getOffsetStore(); + + boolean isConsumeOrderly(); + + MQClientInstance getmQClientFactory(); + + boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID); + + void executePullRequestImmediately(final PullRequest pullRequest); + + RebalanceImpl getRebalanceImpl(); + + ConsumerStatsManager getConsumerStatsManager(); + + void sendMessageBack(MessageExt msg, int delayLevel, + final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + + boolean hasHook(); + + void registerConsumeMessageHook(final ConsumeMessageHook hook); + + void executeHookBefore(final ConsumeMessageContext context); + + void executeHookAfter(final ConsumeMessageContext context); + + void pullMessage(final PullRequest pullRequest); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index 0a52817ced7d9350c9a38dfc3fe358c193307364..661bf03ac8cd2784f52299bce0fa8c68518c092b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.MQRealPushConsumer; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageAccessor; @@ -73,7 +75,7 @@ public class ProcessQueue { /** * @param pushConsumer */ - public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { + public void cleanExpiredMsg(MQRealPushConsumer pushConsumer) { if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { return; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java index bd46a58859acf59714eeba794ad5fa4ac247bebf..d58eed3b166218ab23b4e3daa062a6e367a42c75 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java @@ -79,7 +79,12 @@ public class PullMessageService extends ServiceThread { private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { - DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; + MQPushConsumerInner impl; + if (consumer instanceof DefaultMQRealPushConsumerImpl) { + impl = (DefaultMQRealPushConsumerImpl) consumer; + } else { + impl = (DefaultMQPushConsumerImpl) consumer; + } impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 2e622be88a7f0f59e542389be6ad81855196e7d3..8768ffaea0e1e64a4a70afcffaddf289f5d3d7b5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.exception.MQClientException; @@ -34,15 +35,15 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; public class RebalancePushImpl extends RebalanceImpl { private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000")); - private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; + private final MQPushConsumerInner defaultMQPushConsumerImpl; - public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { + public RebalancePushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) { this(null, null, null, null, defaultMQPushConsumerImpl); } public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, - MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { + MQClientInstance mQClientFactory, MQPushConsumerInner defaultMQPushConsumerImpl) { super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java index 8bd27019dd06804bee576622f08c4d521d8910be..8ee30a1de95b7b75dda45c7a9c1619cd740c975d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java @@ -20,7 +20,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; public class RebalanceRealPushImpl extends RebalancePushImpl { - public RebalanceRealPushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { + public RebalanceRealPushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) { super(defaultMQPushConsumerImpl); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 0953fdc13470b548c5860c4170fc884ddc1b7bee..d5dcb3974eb1e56f20d8b686fdca0bcd2a1e054d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -46,6 +46,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.DefaultMQRealPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.impl.consumer.PullMessageService; @@ -1387,7 +1388,7 @@ public class MQClientInstance { consumerGroup, topic, brokerName, queueID, offset); MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); if (null != mqConsumerInner) { - DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner; + DefaultMQRealPushConsumerImpl consumer = (DefaultMQRealPushConsumerImpl) mqConsumerInner; consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset); return true; } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 87a795e4b4e24f3d84189f69bb838a34b1478152..dd795cd2562c6d8ca01de941f1c1b6e25c495584 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.client.trace; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.log.ClientLogger; @@ -67,7 +68,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private volatile Thread shutDownHook; private volatile boolean stopped = false; private DefaultMQProducerImpl hostProducer; - private DefaultMQPushConsumerImpl hostConsumer; + private MQPushConsumerInner hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; @@ -117,11 +118,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher { this.hostProducer = hostProducer; } - public DefaultMQPushConsumerImpl getHostConsumer() { + public MQPushConsumerInner getHostConsumer() { return hostConsumer; } - public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) { + public void setHostConsumer(MQPushConsumerInner hostConsumer) { this.hostConsumer = hostConsumer; } diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java index df3b3b08d8064acd6f1662cd4e2ced194d1cfffc..233b002bfa4196c9067f3684615a79181ac94835 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.example.quickstart; import java.util.List; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -26,7 +26,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; /** - * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}. + * This example shows how to subscribe and consume messages using providing {@link DefaultMQRealPushConsumer}. */ public class Consumer { @@ -35,7 +35,7 @@ public class Consumer { /* * Instantiate with specified consumer group name. */ - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello"); + DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("hello"); /* * Specify name server addresses. @@ -61,7 +61,6 @@ public class Consumer { /* * Register callback to execute on arrival of messages fetched from brokers. */ -// consumer.setNamesrvAddr("47.102.149.193:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java index 98dcd61ca1afd0ab6f4f2074d8612ab47feea1df..80c27f407cb6f41de9402635e815389145621db7 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -47,7 +47,6 @@ public class Producer { /* * Launch the instance. */ -// producer.setNamesrvAddr("47.102.149.193:9876"); producer.start(); for (int i = 0; i < 10; i++) { @@ -76,7 +75,7 @@ public class Producer { /* * Shut down once the producer instance is not longer in use. */ - Thread.sleep(30000L); + Thread.sleep(3000L); producer.shutdown(); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java index abbfbdffcdda01b60ff3dc3dd1aee3b628849510..29a51acaa269a9e1795afac0a05f19321ee63275 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.example.simple; import java.util.List; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -28,7 +28,7 @@ import org.apache.rocketmq.common.message.MessageExt; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); + DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800