未验证 提交 0e7d2712 编写于 作者: H Heng Du 提交者: GitHub

fix(pull_schedual) add the namespace wrapper in callback (#1804)

上级 b6281144
...@@ -27,6 +27,7 @@ import org.apache.rocketmq.client.exception.MQClientException; ...@@ -27,6 +27,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
...@@ -95,7 +96,7 @@ public class MQPullConsumerScheduleService { ...@@ -95,7 +96,7 @@ public class MQPullConsumerScheduleService {
} }
public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) { public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
this.callbackTable.put(topic, callback); this.callbackTable.put(NamespaceUtil.wrapNamespace(this.defaultMQPullConsumer.getNamespace(), topic), callback);
this.defaultMQPullConsumer.registerMessageQueueListener(topic, null); this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册