From 0e7d27120c3262b51029eb0e049fa613eab0c8d0 Mon Sep 17 00:00:00 2001 From: Heng Du Date: Fri, 28 Feb 2020 15:43:36 +0800 Subject: [PATCH] fix(pull_schedual) add the namespace wrapper in callback (#1804) --- .../client/consumer/MQPullConsumerScheduleService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java index 4d57313f..6a5e714f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -27,6 +27,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; @@ -95,7 +96,7 @@ public class MQPullConsumerScheduleService { } public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) { - this.callbackTable.put(topic, callback); + this.callbackTable.put(NamespaceUtil.wrapNamespace(this.defaultMQPullConsumer.getNamespace(), topic), callback); this.defaultMQPullConsumer.registerMessageQueueListener(topic, null); } -- GitLab