diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index f999eae983eda93ade34ce7d86962bd46354cff8..91e9f887b6dd1b510f5afdafb1ad478d9806e5cb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -99,17 +99,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume *
CONSUME_FROM_LAST_OFFSET
: consumer clients pick up where it stopped previously.
- * If it were a newly booting up consumer client, according aging of the consumer group, there are two
- * cases:
+ * If it were a newly booting up consumer client, according aging of the consumer group, there are two cases:
* - * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on - * {@code pullThresholdForTopic} if it is't unlimited + * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on {@code + * pullThresholdForTopic} if it is't unlimited *
* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer, * then pullThresholdForQueue will be set to 100 @@ -201,11 +197,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Limit the cached message size on topic level, default value is -1 MiB(Unlimited) *
- * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on - * {@code pullThresholdSizeForTopic} if it is't unlimited + * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on {@code + * pullThresholdSizeForTopic} if it is't unlimited *
- * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
- * assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
+ * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this
+ * consumer, then pullThresholdSizeForQueue will be set to 100 MiB
*/
private int pullThresholdSizeForTopic = -1;
@@ -280,16 +276,37 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
}
/**
- * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
+ * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
+ *
+ * @param consumerGroup Consume queue.
+ * @param rpcHook RPC hook to execute before each remoting command.
+ * @param allocateMessageQueueStrategy Message queue allocating algorithm.
+ */
+ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
+ this.consumerGroup = consumerGroup;
+ if (allocateMessageQueueStrategy == null) {
+ this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
+ } else {
+ this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+ }
+ defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
+ }
+
+ /**
+ * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
+ * customized trace topic name.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
- AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
+ final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
@@ -315,7 +332,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
-
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
@@ -331,9 +347,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
*/
- public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
+ public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
+ final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
@@ -632,8 +650,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Subscribe a topic to consuming subscription.
*
* @param topic topic to subscribe.
- * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
- * if null or * expression,meaning subscribe all
+ * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if
+ * null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 5c22a8f6c359caf42a23fc7a964bcc5d9dda8113..392a2f08153bd4b36d68d98e2e10a6aa4811de4f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -114,6 +114,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private boolean realPushModel = true;
private final ConcurrentHashMap