From f32e0b9dc37b52e89ce75cf419e2933a8cccde06 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Fri, 13 Jan 2017 11:20:29 +0800 Subject: [PATCH] Add javadoc to DefaultMQPushConsumer --- .../consumer/DefaultMQPushConsumer.java | 187 +++++++++++++++++- 1 file changed, 178 insertions(+), 9 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 45f23a7e..2cce03d3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -40,51 +40,116 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; /** - * Wrapped push consumer.in fact,it works as remarkable as the pull consumer + * In most scenarios, this is the mostly recommended class to consume messages. + *

+ * + * Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on + * arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages. + *

+ * + * See quickstart/Consumer in the example module for a typical usage. + *

+ * + *

+ * Thread Safety: After initialization, the instance can be regarded as thread-safe. + *

*/ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { + + /** + * Internal implementation. Most of the functions herein are delegated to it. + */ protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; + /** - * Do the same thing for the same Group, the application must be set,and - * guarantee Globally unique + * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve + * load balance. It's required and needs to be globally unique. + *

+ * + * See here for further discussion. */ private String consumerGroup; + /** - * Consumption pattern,default is clustering + * Message model defines the way how messages are delivered to each consumer clients. + *

+ * + * RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with + * the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load + * balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages + * separately. + *

+ * + * This field defaults to clustering. */ private MessageModel messageModel = MessageModel.CLUSTERING; + /** - * Consumption offset + * Consuming point on consumer booting. + *

+ * + * There are three consuming points: + * */ private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + /** - * Backtracking consumption time with second precision.time format is + * Backtracking consumption time with second precision. Time format is * 20131223171201
* Implying Seventeen twelve and 01 seconds on December 23, 2013 year
- * Default backtracking consumption time Half an hour ago + * Default backtracking consumption time Half an hour ago. */ private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)); + /** - * Queue allocation algorithm + * Queue allocation algorithm specifying how message queues are allocated to each consumer clients. */ private AllocateMessageQueueStrategy allocateMessageQueueStrategy; /** * Subscription relationship */ - private Map subscription = new HashMap(); + private Map subscription = new HashMap<>(); + /** * Message listener */ private MessageListener messageListener; + /** * Offset Storage */ private OffsetStore offsetStore; + /** * Minimum consumer thread number */ private int consumeThreadMin = 20; + /** * Max consumer thread number */ @@ -99,18 +164,22 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * Concurrently max span offset.it has no effect on sequential consumption */ private int consumeConcurrentlyMaxSpan = 2000; + /** * Flow control threshold */ private int pullThresholdForQueue = 1000; + /** * Message pull Interval */ private long pullInterval = 0; + /** * Batch consumption size */ private int consumeMessageBatchMaxSize = 1; + /** * Batch pull size */ @@ -126,24 +195,56 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ private boolean unitMode = false; + /** + * Max re-consume times. -1 means 16 times. + *

+ * + * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion + * queue waiting. + */ private int maxReconsumeTimes = -1; + + /** + * Suspending pulling time for cases requiring slow pulling like flow-control scenario. + */ private long suspendCurrentQueueTimeMillis = 1000; + + /** + * Maximum amount of time in minutes a message may block the consuming thread. + */ private long consumeTimeout = 15; + /** + * Default constructor. + */ public DefaultMQPushConsumer() { this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); } + /** + * Constructor specifying consumer group, RPC hook and message queue allocating algorithm. + * @param consumerGroup Consume queue. + * @param rpcHook RPC hook to execute before each remoting command. + * @param allocateMessageQueueStrategy message queue allocating algorithm. + */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } + /** + * Constructor specifying RPC hook. + * @param rpcHook RPC hook to execute before each remoting command. + */ public DefaultMQPushConsumer(RPCHook rpcHook) { this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); } + /** + * Constructor specifying consumer group. + * @param consumerGroup Consumer group. + */ public DefaultMQPushConsumer(final String consumerGroup) { this(consumerGroup, null, new AllocateMessageQueueAveragely()); } @@ -308,12 +409,33 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.subscription = subscription; } + /** + * Send message back to broker which will be re-delivered in future. + * @param msg Message to send back. + * @param delayLevel delay level. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws InterruptedException if the thread is interrupted. + * @throws MQClientException if there is any client error. + */ @Override public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null); } + /** + * Send message back to the broker whose name is brokerName and the message will be re-delivered in + * future. + * + * @param msg Message to send back. + * @param delayLevel delay level. + * @param brokerName broker name. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws InterruptedException if the thread is interrupted. + * @throws MQClientException if there is any client error. + */ @Override public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { @@ -325,11 +447,18 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic); } + /** + * This method gets internal infrastructure readily to serve. Instances must call this method after configuration. + * @throws MQClientException if there is any client error. + */ @Override public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); } + /** + * Shut down this client and releasing underlying resources. + */ @Override public void shutdown() { this.defaultMQPushConsumerImpl.shutdown(); @@ -342,43 +471,83 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); } + /** + * Register a callback to execute on message arrival for concurrent consuming. + * + * @param messageListener message handling callback. + */ @Override public void registerMessageListener(MessageListenerConcurrently messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); } + /** + * Register a callback to execute on message arrival for orderly consuming. + * + * @param messageListener message handling callback. + */ @Override public void registerMessageListener(MessageListenerOrderly messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); } + /** + * Subscribe a topic to consuming subscription. + * + * @param topic topic to subscribe. + * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
+ * if null or * expression,meaning subscribe all + * @throws MQClientException if there is any client error. + */ @Override public void subscribe(String topic, String subExpression) throws MQClientException { this.defaultMQPushConsumerImpl.subscribe(topic, subExpression); } + /** + * Subscribe a topic to consuming subscription. + * @param topic topic to consume. + * @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter + * @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety + * @throws MQClientException + */ @Override public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource); } + /** + * Un-subscribe the specified topic from subscription. + * @param topic message topic + */ @Override public void unsubscribe(String topic) { this.defaultMQPushConsumerImpl.unsubscribe(topic); } + /** + * Update the message consuming thread core pool size. + * + * @param corePoolSize new core pool size. + */ @Override public void updateCorePoolSize(int corePoolSize) { this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize); } + /** + * Suspend pulling new messages. + */ @Override public void suspend() { this.defaultMQPushConsumerImpl.suspend(); } + /** + * Resume pulling. + */ @Override public void resume() { this.defaultMQPushConsumerImpl.resume(); -- GitLab