未验证 提交 509bce4d 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #864 from ShannonDing/snode

Polish the push consumer to support 4.x
......@@ -59,7 +59,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
* <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
* </p>
*/
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
public class DefaultMQPushConsumer extends ClientConfig implements MQRealPushConsumer {
private final InternalLogger log = ClientLogger.getLog();
......@@ -223,7 +223,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Whether update subscription relationship when every pull
*/
private boolean postSubscriptionWhenPull = true;
private boolean postSubscriptionWhenPull = false;
/**
* Whether the unit of subscription group
......@@ -275,24 +275,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
this.consumerGroup = consumerGroup;
if (allocateMessageQueueStrategy == null) {
this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
} else {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
}
/**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
* customized trace topic name.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQRealPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* In most scenarios, this is the mostly recommended class to consume messages.
* </p>
*
* Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on
* arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages.
* </p>
*
* See quickstart/Consumer in the example module for a typical usage.
* </p>
*
* <p>
* <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
* </p>
*/
public class DefaultMQRealPushConsumer extends ClientConfig implements MQRealPushConsumer {
private final InternalLogger log = ClientLogger.getLog();
/**
* Internal implementation. Most of the functions herein are delegated to it.
*/
protected final transient DefaultMQRealPushConsumerImpl defaultMQPushConsumerImpl;
/**
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
* load balance. It's required and needs to be globally unique.
* </p>
*
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
*/
private String consumerGroup;
/**
* Message model defines the way how messages are delivered to each consumer clients.
* </p>
*
* RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
* the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
* balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
* separately.
* </p>
*
* This field defaults to clustering.
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* Consuming point on consumer booting.
* </p>
*
* There are three consuming points:
* <ul>
* <li>
* <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
* If it were a newly booting up consumer client, according aging of the consumer group, there are two cases:
* <ol>
* <li>
* if the consumer group is created so recently that the earliest message being subscribed has yet expired, which
* means the consumer group represents a lately launched business, consuming will start from the very beginning;
* </li>
* <li>
* if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning
* messages born prior to the booting timestamp would be ignored.
* </li>
* </ol>
* </li>
* <li>
* <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available.
* </li>
* <li>
* <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means
* messages born prior to {@link #consumeTimestamp} will be ignored
* </li>
* </ul>
*/
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
* Backtracking consumption time with second precision. Time format is 20131223171201<br> Implying Seventeen twelve
* and 01 seconds on December 23, 2013 year<br> Default backtracking consumption time Half an hour ago.
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
/**
* Queue allocation algorithm specifying how message queues are allocated to each consumer clients.
*/
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
/**
* Subscription relationship
*/
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
/**
* Message listener
*/
private MessageListener messageListener;
/**
* Offset Storage
*/
private OffsetStore offsetStore;
/**
* Minimum consumer thread number
*/
private int consumeThreadMin = 20;
/**
* Max consumer thread number
*/
private int consumeThreadMax = 64;
/**
* Threshold for dynamic adjustment of the number of thread pool
*/
private long adjustThreadPoolNumsThreshold = 100000;
/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
private int consumeConcurrentlyMaxSpan = 2000;
/**
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider
* the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private int pullThresholdForQueue = 1000;
/**
* Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*
* <p>
* The size of a message only measured by message body, so it's not accurate
*/
private int pullThresholdSizeForQueue = 100;
/**
* Flow control threshold on topic level, default value is -1(Unlimited)
* <p>
* The value of {@code pullThresholdForQueue} will be overwrote and calculated based on {@code
* pullThresholdForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
* then pullThresholdForQueue will be set to 100
*/
private int pullThresholdForTopic = -1;
/**
* Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
* <p>
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on {@code
* pullThresholdSizeForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this
* consumer, then pullThresholdSizeForQueue will be set to 100 MiB
*/
private int pullThresholdSizeForTopic = -1;
/**
* Message pull Interval
*/
private long pullInterval = 0;
/**
* Batch consumption size
*/
private int consumeMessageBatchMaxSize = 1;
/**
* Batch pull size
*/
private int pullBatchSize = 32;
/**
* Whether update subscription relationship when every pull
*/
private boolean postSubscriptionWhenPull = true;
/**
* Whether the unit of subscription group
*/
private boolean unitMode = false;
/**
* Max re-consume times. -1 means 16 times.
* </p>
*
* If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
* queue waiting.
*/
private int maxReconsumeTimes = -1;
/**
* Suspending pulling time for cases requiring slow pulling like flow-control scenario.
*/
private long suspendCurrentQueueTimeMillis = 1000;
/**
* Maximum amount of time in minutes a message may block the consuming thread.
*/
private long consumeTimeout = 15;
/**
* Interface of asynchronous transfer data
*/
private TraceDispatcher traceDispatcher = null;
/**
* Default constructor.
*/
public DefaultMQRealPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook);
}
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
this.consumerGroup = consumerGroup;
if (allocateMessageQueueStrategy == null) {
this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
} else {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook, realPushModel);
}
/**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
* customized trace topic name.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/**
* Constructor specifying RPC hook.
*
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQRealPushConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQRealPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
}
/**
* Constructor specifying consumer group, enabled msg trace flag and customized trace topic name.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQRealPushConsumer(final String consumerGroup, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
*/
public DefaultMQRealPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
}
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
}
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
}
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.maxOffset(mq);
}
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.minOffset(mq);
}
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq);
}
@Override
public MessageExt viewMessage(
String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
}
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
}
@Override
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
MessageDecoder.decodeMessageId(msgId);
return this.viewMessage(msgId);
} catch (Exception e) {
// Ignore
}
return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, msgId);
}
public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
return allocateMessageQueueStrategy;
}
public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
public int getConsumeConcurrentlyMaxSpan() {
return consumeConcurrentlyMaxSpan;
}
public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {
this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
}
public ConsumeFromWhere getConsumeFromWhere() {
return consumeFromWhere;
}
public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
}
public int getConsumeMessageBatchMaxSize() {
return consumeMessageBatchMaxSize;
}
public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public int getConsumeThreadMax() {
return consumeThreadMax;
}
public void setConsumeThreadMax(int consumeThreadMax) {
this.consumeThreadMax = consumeThreadMax;
}
public int getConsumeThreadMin() {
return consumeThreadMin;
}
public void setConsumeThreadMin(int consumeThreadMin) {
this.consumeThreadMin = consumeThreadMin;
}
public MQPushConsumerInner getDefaultMQPushConsumerImpl() {
return defaultMQPushConsumerImpl;
}
public MessageListener getMessageListener() {
return messageListener;
}
public void setMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
public MessageModel getMessageModel() {
return messageModel;
}
public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}
public int getPullBatchSize() {
return pullBatchSize;
}
public void setPullBatchSize(int pullBatchSize) {
this.pullBatchSize = pullBatchSize;
}
public long getPullInterval() {
return pullInterval;
}
public void setPullInterval(long pullInterval) {
this.pullInterval = pullInterval;
}
public int getPullThresholdForQueue() {
return pullThresholdForQueue;
}
public void setPullThresholdForQueue(int pullThresholdForQueue) {
this.pullThresholdForQueue = pullThresholdForQueue;
}
public int getPullThresholdForTopic() {
return pullThresholdForTopic;
}
public void setPullThresholdForTopic(final int pullThresholdForTopic) {
this.pullThresholdForTopic = pullThresholdForTopic;
}
public int getPullThresholdSizeForQueue() {
return pullThresholdSizeForQueue;
}
public void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue) {
this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
}
public int getPullThresholdSizeForTopic() {
return pullThresholdSizeForTopic;
}
public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) {
this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;
}
public Map<String, String> getSubscription() {
return subscription;
}
public void setSubscription(Map<String, String> subscription) {
this.subscription = subscription;
}
/**
* Send message back to broker which will be re-delivered in future.
*
* @param msg Message to send back.
* @param delayLevel delay level.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
*/
@Override
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
}
/**
* Send message back to the broker whose name is <code>brokerName</code> and the message will be re-delivered in
* future.
*
* @param msg Message to send back.
* @param delayLevel delay level.
* @param brokerName broker name.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
*/
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
}
@Override
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
}
/**
* This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
*
* @throws MQClientException if there is any client error.
*/
@Override
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
/**
* Shut down this client and releasing underlying resources.
*/
@Override
public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
}
@Override
@Deprecated
public void registerMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
/**
* Register a callback to execute on message arrival for concurrent consuming.
*
* @param messageListener message handling callback.
*/
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
/**
* Register a callback to execute on message arrival for orderly consuming.
*
* @param messageListener message handling callback.
*/
@Override
public void registerMessageListener(MessageListenerOrderly messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
/**
* Subscribe a topic to consuming subscription.
*
* @param topic topic to subscribe.
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
}
/**
* Subscribe a topic to consuming subscription.
*
* @param topic topic to consume.
* @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
* @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
*/
@Override
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
}
/**
* Subscribe a topic by message selector.
*
* @param topic topic to consume.
* @param messageSelector {@link MessageSelector}
* @see MessageSelector#bySql
* @see MessageSelector#byTag
*/
@Override
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, messageSelector);
}
/**
* Un-subscribe the specified topic from subscription.
*
* @param topic message topic
*/
@Override
public void unsubscribe(String topic) {
this.defaultMQPushConsumerImpl.unsubscribe(topic);
}
/**
* Update the message consuming thread core pool size.
*
* @param corePoolSize new core pool size.
*/
@Override
public void updateCorePoolSize(int corePoolSize) {
this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize);
}
/**
* Suspend pulling new messages.
*/
@Override
public void suspend() {
this.defaultMQPushConsumerImpl.suspend();
}
/**
* Resume pulling.
*/
@Override
public void resume() {
this.defaultMQPushConsumerImpl.resume();
}
public OffsetStore getOffsetStore() {
return offsetStore;
}
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}
public String getConsumeTimestamp() {
return consumeTimestamp;
}
public void setConsumeTimestamp(String consumeTimestamp) {
this.consumeTimestamp = consumeTimestamp;
}
public boolean isPostSubscriptionWhenPull() {
return postSubscriptionWhenPull;
}
public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) {
this.postSubscriptionWhenPull = postSubscriptionWhenPull;
}
public boolean isUnitMode() {
return unitMode;
}
public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode;
}
public long getAdjustThreadPoolNumsThreshold() {
return adjustThreadPoolNumsThreshold;
}
public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) {
this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
}
public int getMaxReconsumeTimes() {
return maxReconsumeTimes;
}
public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes;
}
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
public long getConsumeTimeout() {
return consumeTimeout;
}
public void setConsumeTimeout(final long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
}
public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}
}
......@@ -20,6 +20,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
/**
* Push consumer
......@@ -49,14 +50,13 @@ public interface MQPushConsumer extends MQConsumer {
* Subscribe some topic
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe
* all
* null or * expression,meaning subscribe all
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
/**
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
* is recommended.
* This method will be removed in the version 5.0.0,because filterServer was removed,and method
* <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
*
* Subscribe some topic
*
......@@ -70,8 +70,8 @@ public interface MQPushConsumer extends MQConsumer {
/**
* Subscribe some topic with selector.
* <p>
* This interface also has the ability of {@link #subscribe(String, String)},
* and, support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
* This interface also has the ability of {@link #subscribe(String, String)}, and, support other message selection,
* such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
* </p>
* <p/>
* <p>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* Real Push consumer
*/
public interface MQRealPushConsumer extends MQPushConsumer {
MQPushConsumerInner getDefaultMQPushConsumerImpl();
long getConsumeTimeout();
int getPullThresholdForTopic();
int getPullThresholdForQueue();
void setPullThresholdForQueue(int pullThresholdForQueue);
int getPullThresholdSizeForTopic();
void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic);
int getPullThresholdSizeForQueue();
void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue);
ConsumeFromWhere getConsumeFromWhere();
String getConsumeTimestamp();
String getConsumerGroup();
int getConsumeThreadMin();
void setConsumeThreadMin(int consumeThreadMin);
int getConsumeThreadMax();
long getSuspendCurrentQueueTimeMillis();
int getMaxReconsumeTimes();
void setMaxReconsumeTimes(final int maxReconsumeTimes);
int getConsumeMessageBatchMaxSize();
MessageModel getMessageModel();
}
\ No newline at end of file
......@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
......@@ -50,8 +51,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final MQPushConsumerInner defaultMQPushConsumerImpl;
private final MQRealPushConsumer defaultMQPushConsumer;
private final MessageListenerConcurrently messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor;
......@@ -60,7 +61,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService cleanExpireMsgExecutors;
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
public ConsumeMessageConcurrentlyService(MQPushConsumerInner defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
......
......@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
......@@ -52,8 +53,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final MQPushConsumerInner defaultMQPushConsumerImpl;
private final MQRealPushConsumer defaultMQPushConsumer;
private final MessageListenerOrderly messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor;
......@@ -62,7 +63,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private final ScheduledExecutorService scheduledExecutorService;
private volatile boolean stopped = false;
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
public ConsumeMessageOrderlyService(MQPushConsumerInner defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
......
......@@ -26,10 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
......@@ -76,9 +73,10 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public class DefaultMQPushConsumerImpl implements MQPushConsumerInner {
/**
* Delay some time when exception occur
*/
......@@ -95,8 +93,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer;
//private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final RebalanceImpl rebalanceImpl;
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
......@@ -111,26 +108,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
private boolean realPushModel = true;
private final ConcurrentHashMap<String, AtomicLong> localConsumerOffset = new ConcurrentHashMap<String, AtomicLong>();
private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
private final ConcurrentHashMap<String, ProcessQueue> processQueues = new ConcurrentHashMap<String, ProcessQueue>();
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this(defaultMQPushConsumer, rpcHook, true);
}
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
boolean realPushModel) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
this.realPushModel = realPushModel;
if (realPushModel) {
log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
rebalanceImpl = new RebalanceRealPushImpl(this);
} else {
rebalanceImpl = new RebalancePushImpl(this);
}
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
......@@ -307,17 +288,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//Update local offset according remote offset
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1));
}
localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
......@@ -475,15 +445,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
}
......@@ -500,15 +461,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
......@@ -545,8 +497,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, snodeAddr, msg,
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(null, brokerAddr,msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
......@@ -675,10 +628,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
default:
break;
}
this.tryToFindSnodePublishInfo();
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllSnodeWithLock();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
......@@ -896,8 +849,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup());
}
}
}
......@@ -1020,12 +971,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public ConsumeType consumeType() {
if (realPushModel) {
return ConsumeType.CONSUME_PUSH;
} else {
return ConsumeType.CONSUME_PASSIVELY;
}
}
@Override
public ConsumeFromWhere consumeFromWhere() {
......@@ -1193,90 +1140,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
public boolean processPushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
final String brokerName,
final int queueID,
final long offset) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
log.info("Current Local offset have not set, initiallized to -1.");
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false;
}
if (localOffset.get() + 1 < offset) {
//should start pull message process
log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
}
pullStop = this.pullStopped.get(localOffsetKey);
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
}
//update local offset
localOffset.set(offset);
//submit to process queue
List<MessageExt> messageExtList = new ArrayList<MessageExt>();
messageExtList.add(msg);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue == null) {
processQueues.put(localOffsetKey, new ProcessQueue());
processQueue = processQueues.get(localOffsetKey);
}
processQueue.putMessage(messageExtList);
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
}
return true;
}
private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
}
public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue != null) {
log.info("Clear local expire message for {} in processQueue.", localOffsetKey);
processQueue.cleanExpiredMsg(this.defaultMQPushConsumer);
}
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop != null) {
if (pullStop.get()) {
pullStop.set(false);
log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey);
}
}
return true;
}
public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
return true;
}
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
}
return true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
/**
* Delay some time when exception occur
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
/**
* Flow control interval
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
/**
* Delay some time when suspend pull service
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQRealPushConsumer defaultMQPushConsumer;
//private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final RebalanceImpl rebalanceImpl;
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final RPCHook rpcHook;
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private volatile boolean pause = false;
private boolean consumeOrderly = false;
private MessageListener messageListenerInner;
private OffsetStore offsetStore;
private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
private boolean realPushModel = true;
private final ConcurrentHashMap<String, AtomicLong> localConsumerOffset = new ConcurrentHashMap<String, AtomicLong>();
private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
private final ConcurrentHashMap<String, ProcessQueue> processQueues = new ConcurrentHashMap<String, ProcessQueue>();
public DefaultMQRealPushConsumerImpl(DefaultMQRealPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this(defaultMQPushConsumer, rpcHook, true);
}
public DefaultMQRealPushConsumerImpl(DefaultMQRealPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
boolean realPushModel) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
this.realPushModel = realPushModel;
log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
rebalanceImpl = new RebalanceRealPushImpl(this);
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
this.filterMessageHookList.add(hook);
log.info("register FilterMessageHook Hook, {}", hook.hookName());
}
public boolean hasHook() {
return !this.consumeMessageHookList.isEmpty();
}
public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
this.consumeMessageHookList.add(hook);
log.info("register consumeMessageHook Hook, {}", hook.hookName());
}
public void executeHookBefore(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageBefore(context);
} catch (Throwable e) {
}
}
}
}
public void executeHookAfter(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageAfter(context);
} catch (Throwable e) {
}
}
}
}
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
}
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
}
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
if (null == result) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
}
if (null == result) {
throw new MQClientException("The topic[" + topic + "] not exist", null);
}
return result;
}
public MQRealPushConsumer getDefaultMQPushConsumer() {
return defaultMQPushConsumer;
}
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
}
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
}
public long minOffset(MessageQueue mq) throws MQClientException {
return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
}
public OffsetStore getOffsetStore() {
return offsetStore;
}
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//Update local offset according remote offset
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1));
}
localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
pullResult = DefaultMQRealPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQRealPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQRealPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQRealPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQRealPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQRealPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQRealPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQRealPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQRealPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQRealPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQRealPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQRealPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQRealPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQRealPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The consumer service state not OK, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
}
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
}
public boolean isPause() {
return pause;
}
public void setPause(boolean pause) {
this.pause = pause;
}
public ConsumerStatsManager getConsumerStatsManager() {
return this.mQClientFactory.getConsumerStatsManager();
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
private void correctTagsOffset(final PullRequest pullRequest) {
if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
}
}
public void executeTaskLater(final Runnable r, final long timeDelay) {
this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay);
}
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
}
public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException,
InterruptedException {
return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
}
public void registerMessageListener(MessageListener messageListener) {
this.messageListenerInner = messageListener;
}
public void resume() {
this.pause = false;
doRebalance();
log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
}
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, snodeAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
}
private int getMaxReconsumeTimes() {
// default reconsume times: 16
if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
return 16;
} else {
return this.defaultMQPushConsumer.getMaxReconsumeTimes();
}
}
public synchronized void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown();
this.persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.destroy();
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.tryToFindSnodePublishInfo();
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllSnodeWithLock();
this.mQClientFactory.rebalanceImmediately();
}
private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
throw new MQClientException(
"consumerGroup is null"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
throw new MQClientException(
"consumerGroup can not equal "
+ MixAll.DEFAULT_CONSUMER_GROUP
+ ", please specify another one."
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
if (null == this.defaultMQPushConsumer.getMessageModel()) {
throw new MQClientException(
"messageModel is null"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
throw new MQClientException(
"consumeFromWhere is null"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS);
if (null == dt) {
throw new MQClientException(
"consumeTimestamp is invalid, the valid format is yyyyMMddHHmmss,but received "
+ this.defaultMQPushConsumer.getConsumeTimestamp()
+ " " + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
}
// allocateMessageQueueStrategy
if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {
throw new MQClientException(
"allocateMessageQueueStrategy is null"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// subscription
if (null == this.defaultMQPushConsumer.getSubscription()) {
throw new MQClientException(
"subscription is null"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// messageListener
if (null == this.defaultMQPushConsumer.getMessageListener()) {
throw new MQClientException(
"messageListener is null"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
if (!orderly && !concurrently) {
throw new MQClientException(
"messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// consumeThreadMin
if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
throw new MQClientException(
"consumeThreadMin Out of range [1, 1000]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// consumeThreadMax
if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
throw new MQClientException(
"consumeThreadMax Out of range [1, 1000]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// consumeThreadMin can't be larger than consumeThreadMax
if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
throw new MQClientException(
"consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") "
+ "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")",
null);
}
// consumeConcurrentlyMaxSpan
if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
throw new MQClientException(
"consumeConcurrentlyMaxSpan Out of range [1, 65535]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// pullThresholdForQueue
if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {
throw new MQClientException(
"pullThresholdForQueue Out of range [1, 65535]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// pullThresholdForTopic
if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {
if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {
throw new MQClientException(
"pullThresholdForTopic Out of range [1, 6553500]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
// pullThresholdSizeForQueue
if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {
throw new MQClientException(
"pullThresholdSizeForQueue Out of range [1, 1024]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {
// pullThresholdSizeForTopic
if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {
throw new MQClientException(
"pullThresholdSizeForTopic Out of range [1, 102400]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
// pullInterval
if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
throw new MQClientException(
"pullInterval Out of range [0, 65535]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// consumeMessageBatchMaxSize
if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1
|| this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
throw new MQClientException(
"consumeMessageBatchMaxSize Out of range [1, 1024]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// pullBatchSize
if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
throw new MQClientException(
"pullBatchSize Out of range [1, 1024]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
public MessageListener getMessageListenerInner() {
return messageListenerInner;
}
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup());
}
}
}
public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return this.rebalanceImpl.getSubscriptionInner();
}
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, "*");
subscriptionData.setSubString(fullClassName);
subscriptionData.setClassFilterMode(true);
subscriptionData.setFilterClassSource(filterClassSource);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
try {
if (messageSelector == null) {
subscribe(topic, SubscriptionData.SUB_ALL);
return;
}
SubscriptionData subscriptionData = FilterAPI.build(topic,
messageSelector.getExpression(), messageSelector.getExpressionType());
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
public void suspend() {
this.pause = true;
log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
}
public void unsubscribe(String topic) {
this.rebalanceImpl.getSubscriptionInner().remove(topic);
}
public void updateConsumeOffset(MessageQueue mq, long offset) {
this.offsetStore.updateOffset(mq, offset, false);
}
public void updateCorePoolSize(int corePoolSize) {
this.consumeMessageService.updateCorePoolSize(corePoolSize);
}
public MessageExt viewMessage(String msgId)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
}
public RebalanceImpl getRebalanceImpl() {
return rebalanceImpl;
}
public boolean isConsumeOrderly() {
return consumeOrderly;
}
public void setConsumeOrderly(boolean consumeOrderly) {
this.consumeOrderly = consumeOrderly;
}
public void resetOffsetByTimeStamp(long timeStamp)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) {
Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
if (mqs != null) {
for (MessageQueue mq : mqs) {
long offset = searchOffset(mq, timeStamp);
offsetTable.put(mq, offset);
}
this.mQClientFactory.resetOffset(topic, groupName(), offsetTable);
}
}
}
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
}
@Override
public String groupName() {
return this.defaultMQPushConsumer.getConsumerGroup();
}
@Override
public MessageModel messageModel() {
return this.defaultMQPushConsumer.getMessageModel();
}
@Override
public ConsumeType consumeType() {
if (realPushModel) {
return ConsumeType.CONSUME_PUSH;
} else {
return ConsumeType.CONSUME_PASSIVELY;
}
}
@Override
public ConsumeFromWhere consumeFromWhere() {
return this.defaultMQPushConsumer.getConsumeFromWhere();
}
@Override
public Set<SubscriptionData> subscriptions() {
Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();
subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
return subSet;
}
@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
@Override
public void persistConsumerOffset() {
try {
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}
@Override
public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
if (subTable.containsKey(topic)) {
this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);
}
}
}
@Override
public boolean isSubscribeTopicNeedUpdate(String topic) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
if (subTable.containsKey(topic)) {
return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
}
}
return false;
}
@Override
public boolean isUnitMode() {
return this.defaultMQPushConsumer.isUnitMode();
}
@Override
public ConsumerRunningInfo consumerRunningInfo() {
ConsumerRunningInfo info = new ConsumerRunningInfo();
Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer);
prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly));
prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, String.valueOf(this.consumeMessageService.getCorePoolSize()));
prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));
info.setProperties(prop);
Set<SubscriptionData> subSet = this.subscriptions();
info.getSubscriptionSet().addAll(subSet);
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.rebalanceImpl.getProcessQueueTable().entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
ProcessQueueInfo pqinfo = new ProcessQueueInfo();
pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
pq.fillProcessQueueInfo(pqinfo);
info.getMqTable().put(mq, pqinfo);
}
for (SubscriptionData sd : subSet) {
ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic());
info.getStatusTable().put(sd.getTopic(), consumeStatus);
}
return info;
}
public MQClientInstance getmQClientFactory() {
return mQClientFactory;
}
public void setmQClientFactory(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}
public ServiceState getServiceState() {
return serviceState;
}
//Don't use this deprecated setter, which will be removed soon.
@Deprecated
public synchronized void setServiceState(ServiceState serviceState) {
this.serviceState = serviceState;
}
public void adjustThreadPool() {
long computeAccTotal = this.computeAccumulationTotal();
long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);
long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);
if (computeAccTotal >= incThreshold) {
this.consumeMessageService.incCorePoolSize();
}
if (computeAccTotal < decThreshold) {
this.consumeMessageService.decCorePoolSize();
}
}
private long computeAccumulationTotal() {
long msgAccTotal = 0;
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue value = next.getValue();
msgAccTotal += value.getMsgAccCnt();
}
return msgAccTotal;
}
public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>();
TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000);
for (BrokerData brokerData : routeData.getBrokerDatas()) {
String addr = brokerData.selectBrokerAddr();
queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, groupName(), 3000));
}
return queueTimeSpan;
}
public ConsumeMessageService getConsumeMessageService() {
return consumeMessageService;
}
public void setConsumeMessageService(ConsumeMessageService consumeMessageService) {
this.consumeMessageService = consumeMessageService;
}
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
public boolean processPushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
final String brokerName,
final int queueID,
final long offset) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
log.info("Current Local offset have not set, initiallized to -1.");
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false;
}
if (localOffset.get() + 1 < offset) {
//should start pull message process
log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
}
pullStop = this.pullStopped.get(localOffsetKey);
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
}
//update local offset
localOffset.set(offset);
//submit to process queue
List<MessageExt> messageExtList = new ArrayList<MessageExt>();
messageExtList.add(msg);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue == null) {
processQueues.put(localOffsetKey, new ProcessQueue());
processQueue = processQueues.get(localOffsetKey);
}
processQueue.putMessage(messageExtList);
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
}
return true;
}
private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
}
public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue != null) {
log.info("Clear local expire message for {} in processQueue.", localOffsetKey);
processQueue.cleanExpiredMsg(this.defaultMQPushConsumer);
}
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop != null) {
if (pullStop.get()) {
pullStop.set(false);
log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey);
}
}
return true;
}
public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
return true;
}
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
}
return true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Push Consumer inner interface
*/
public interface MQPushConsumerInner extends MQConsumerInner {
MQRealPushConsumer getDefaultMQPushConsumer();
OffsetStore getOffsetStore();
boolean isConsumeOrderly();
MQClientInstance getmQClientFactory();
boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID);
void executePullRequestImmediately(final PullRequest pullRequest);
RebalanceImpl getRebalanceImpl();
ConsumerStatsManager getConsumerStatsManager();
void sendMessageBack(MessageExt msg, int delayLevel,
final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
boolean hasHook();
void registerConsumeMessageHook(final ConsumeMessageHook hook);
void executeHookBefore(final ConsumeMessageContext context);
void executeHookAfter(final ConsumeMessageContext context);
void pullMessage(final PullRequest pullRequest);
}
......@@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
......@@ -73,7 +75,7 @@ public class ProcessQueue {
/**
* @param pushConsumer
*/
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
public void cleanExpiredMsg(MQRealPushConsumer pushConsumer) {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
return;
}
......
......@@ -79,7 +79,12 @@ public class PullMessageService extends ServiceThread {
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
MQPushConsumerInner impl;
if (consumer instanceof DefaultMQRealPushConsumerImpl) {
impl = (DefaultMQRealPushConsumerImpl) consumer;
} else {
impl = (DefaultMQPushConsumerImpl) consumer;
}
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
......
......@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -34,15 +35,15 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class RebalancePushImpl extends RebalanceImpl {
private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final MQPushConsumerInner defaultMQPushConsumerImpl;
public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
public RebalancePushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) {
this(null, null, null, null, defaultMQPushConsumerImpl);
}
public RebalancePushImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
MQClientInstance mQClientFactory, MQPushConsumerInner defaultMQPushConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
}
......
......@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
public class RebalanceRealPushImpl extends RebalancePushImpl {
public RebalanceRealPushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
public RebalanceRealPushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) {
super(defaultMQPushConsumerImpl);
}
......
......@@ -46,6 +46,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQRealPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullMessageService;
......@@ -1387,7 +1388,7 @@ public class MQClientInstance {
consumerGroup, topic, brokerName, queueID, offset);
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
DefaultMQRealPushConsumerImpl consumer = (DefaultMQRealPushConsumerImpl) mqConsumerInner;
consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset);
return true;
}
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
......@@ -67,7 +68,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private MQPushConsumerInner hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
......@@ -117,11 +118,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.hostProducer = hostProducer;
}
public DefaultMQPushConsumerImpl getHostConsumer() {
public MQPushConsumerInner getHostConsumer() {
return hostConsumer;
}
public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
public void setHostConsumer(MQPushConsumerInner hostConsumer) {
this.hostConsumer = hostConsumer;
}
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.example.quickstart;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
......@@ -26,7 +26,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
/**
* This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
* This example shows how to subscribe and consume messages using providing {@link DefaultMQRealPushConsumer}.
*/
public class Consumer {
......@@ -35,7 +35,7 @@ public class Consumer {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello");
DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("hello");
/*
* Specify name server addresses.
......@@ -61,7 +61,6 @@ public class Consumer {
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
// consumer.setNamesrvAddr("47.102.149.193:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
......
......@@ -47,7 +47,6 @@ public class Producer {
/*
* Launch the instance.
*/
// producer.setNamesrvAddr("47.102.149.193:9876");
producer.start();
for (int i = 0; i < 10; i++) {
......@@ -76,7 +75,7 @@ public class Producer {
/*
* Shut down once the producer instance is not longer in use.
*/
Thread.sleep(30000L);
Thread.sleep(3000L);
producer.shutdown();
}
}
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.example.simple;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
......@@ -28,7 +28,7 @@ import org.apache.rocketmq.common.message.MessageExt;
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册