From c40cdf09c0a891adc4b8ce871d6846f3c132d617 Mon Sep 17 00:00:00 2001 From: Heng Du Date: Thu, 11 Jul 2019 19:59:31 +0800 Subject: [PATCH] Add RPCHook construct method for MQPullConsumerScheduleService (#1314) --- .../client/consumer/MQPullConsumerScheduleService.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java index 44b864e3..54366880 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -26,9 +26,10 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.RPCHook; /** * Schedule service for pull consumer @@ -49,6 +50,11 @@ public class MQPullConsumerScheduleService { this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); } + public MQPullConsumerScheduleService(final String consumerGroup, final RPCHook rpcHook) { + this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup, rpcHook); + this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); + } + public void putTask(String topic, Set mqNewSet) { Iterator> it = this.taskTable.entrySet().iterator(); while (it.hasNext()) { -- GitLab