diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java index 4d57313f8fa215b6b0fba0b3239c13b01d7425b3..6a5e714fbba9e77f4c7f03d8a2c9cf297fcf0c93 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -27,6 +27,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; @@ -95,7 +96,7 @@ public class MQPullConsumerScheduleService { } public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) { - this.callbackTable.put(topic, callback); + this.callbackTable.put(NamespaceUtil.wrapNamespace(this.defaultMQPullConsumer.getNamespace(), topic), callback); this.defaultMQPullConsumer.registerMessageQueueListener(topic, null); }