diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java index 44b864e3487b59a576dcbfdc6bbde2afef075c90..5436688052cc52634782d5f4f344933470159973 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -26,9 +26,10 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.RPCHook; /** * Schedule service for pull consumer @@ -49,6 +50,11 @@ public class MQPullConsumerScheduleService { this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); } + public MQPullConsumerScheduleService(final String consumerGroup, final RPCHook rpcHook) { + this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup, rpcHook); + this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); + } + public void putTask(String topic, Set mqNewSet) { Iterator> it = this.taskTable.entrySet().iterator(); while (it.hasNext()) {