diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 45f23a7ef0cc12a2fb45d68dfdd1b770b421fb2f..2cce03d347389c4b23476c8886664b5975a1f6ae 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -40,51 +40,116 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
- * Wrapped push consumer.in fact,it works as remarkable as the pull consumer
+ * In most scenarios, this is the mostly recommended class to consume messages.
+ *
+ *
+ * Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on
+ * arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages.
+ *
+ *
+ * See quickstart/Consumer in the example module for a typical usage.
+ *
+ *
+ *
+ * Thread Safety: After initialization, the instance can be regarded as thread-safe.
+ *
*/
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
+
+ /**
+ * Internal implementation. Most of the functions herein are delegated to it.
+ */
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+
/**
- * Do the same thing for the same Group, the application must be set,and
- * guarantee Globally unique
+ * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
+ * load balance. It's required and needs to be globally unique.
+ *
+ *
+ * See here for further discussion.
*/
private String consumerGroup;
+
/**
- * Consumption pattern,default is clustering
+ * Message model defines the way how messages are delivered to each consumer clients.
+ *
+ *
+ * RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
+ * the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
+ * balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
+ * separately.
+ *
+ *
+ * This field defaults to clustering.
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
+
/**
- * Consumption offset
+ * Consuming point on consumer booting.
+ *
+ *
+ * There are three consuming points:
+ *
+ * -
+ *
CONSUME_FROM_LAST_OFFSET
: consumer clients pick up where it stopped previously.
+ * If it were a newly booting up consumer client, according aging of the consumer group, there are two
+ * cases:
+ *
+ * -
+ * if the consumer group is created so recently that the earliest message being subscribed has yet
+ * expired, which means the consumer group represents a lately launched business, consuming will
+ * start from the very beginning;
+ *
+ * -
+ * if the earliest message being subscribed has expired, consuming will start from the latest
+ * messages, meaning messages born prior to the booting timestamp would be ignored.
+ *
+ *
+ *
+ * -
+ *
CONSUME_FROM_FIRST_OFFSET
: Consumer client will start from earliest messages available.
+ *
+ * -
+ *
CONSUME_FROM_TIMESTAMP
: Consumer client will start from specified timestamp, which means
+ * messages born prior to {@link #consumeTimestamp} will be ignored
+ *
+ *
*/
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+
/**
- * Backtracking consumption time with second precision.time format is
+ * Backtracking consumption time with second precision. Time format is
* 20131223171201
* Implying Seventeen twelve and 01 seconds on December 23, 2013 year
- * Default backtracking consumption time Half an hour ago
+ * Default backtracking consumption time Half an hour ago.
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
+
/**
- * Queue allocation algorithm
+ * Queue allocation algorithm specifying how message queues are allocated to each consumer clients.
*/
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
/**
* Subscription relationship
*/
- private Map subscription = new HashMap();
+ private Map subscription = new HashMap<>();
+
/**
* Message listener
*/
private MessageListener messageListener;
+
/**
* Offset Storage
*/
private OffsetStore offsetStore;
+
/**
* Minimum consumer thread number
*/
private int consumeThreadMin = 20;
+
/**
* Max consumer thread number
*/
@@ -99,18 +164,22 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Concurrently max span offset.it has no effect on sequential consumption
*/
private int consumeConcurrentlyMaxSpan = 2000;
+
/**
* Flow control threshold
*/
private int pullThresholdForQueue = 1000;
+
/**
* Message pull Interval
*/
private long pullInterval = 0;
+
/**
* Batch consumption size
*/
private int consumeMessageBatchMaxSize = 1;
+
/**
* Batch pull size
*/
@@ -126,24 +195,56 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private boolean unitMode = false;
+ /**
+ * Max re-consume times. -1 means 16 times.
+ *
+ *
+ * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
+ * queue waiting.
+ */
private int maxReconsumeTimes = -1;
+
+ /**
+ * Suspending pulling time for cases requiring slow pulling like flow-control scenario.
+ */
private long suspendCurrentQueueTimeMillis = 1000;
+
+ /**
+ * Maximum amount of time in minutes a message may block the consuming thread.
+ */
private long consumeTimeout = 15;
+ /**
+ * Default constructor.
+ */
public DefaultMQPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}
+ /**
+ * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
+ * @param consumerGroup Consume queue.
+ * @param rpcHook RPC hook to execute before each remoting command.
+ * @param allocateMessageQueueStrategy message queue allocating algorithm.
+ */
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
+ /**
+ * Constructor specifying RPC hook.
+ * @param rpcHook RPC hook to execute before each remoting command.
+ */
public DefaultMQPushConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
+ /**
+ * Constructor specifying consumer group.
+ * @param consumerGroup Consumer group.
+ */
public DefaultMQPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
@@ -308,12 +409,33 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.subscription = subscription;
}
+ /**
+ * Send message back to broker which will be re-delivered in future.
+ * @param msg Message to send back.
+ * @param delayLevel delay level.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ */
@Override
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
}
+ /**
+ * Send message back to the broker whose name is brokerName
and the message will be re-delivered in
+ * future.
+ *
+ * @param msg Message to send back.
+ * @param delayLevel delay level.
+ * @param brokerName broker name.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ */
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -325,11 +447,18 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
}
+ /**
+ * This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
+ * @throws MQClientException if there is any client error.
+ */
@Override
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
}
+ /**
+ * Shut down this client and releasing underlying resources.
+ */
@Override
public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown();
@@ -342,43 +471,83 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
+ /**
+ * Register a callback to execute on message arrival for concurrent consuming.
+ *
+ * @param messageListener message handling callback.
+ */
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
+ /**
+ * Register a callback to execute on message arrival for orderly consuming.
+ *
+ * @param messageListener message handling callback.
+ */
@Override
public void registerMessageListener(MessageListenerOrderly messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
+ /**
+ * Subscribe a topic to consuming subscription.
+ *
+ * @param topic topic to subscribe.
+ * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
+ * if null or * expression,meaning subscribe all
+ * @throws MQClientException if there is any client error.
+ */
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
}
+ /**
+ * Subscribe a topic to consuming subscription.
+ * @param topic topic to consume.
+ * @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
+ * @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
+ * @throws MQClientException
+ */
@Override
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
}
+ /**
+ * Un-subscribe the specified topic from subscription.
+ * @param topic message topic
+ */
@Override
public void unsubscribe(String topic) {
this.defaultMQPushConsumerImpl.unsubscribe(topic);
}
+ /**
+ * Update the message consuming thread core pool size.
+ *
+ * @param corePoolSize new core pool size.
+ */
@Override
public void updateCorePoolSize(int corePoolSize) {
this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize);
}
+ /**
+ * Suspend pulling new messages.
+ */
@Override
public void suspend() {
this.defaultMQPushConsumerImpl.suspend();
}
+ /**
+ * Resume pulling.
+ */
@Override
public void resume() {
this.defaultMQPushConsumerImpl.resume();