From 27296f1d222fe4d42cb2e8a8d94c28cc6edaf632 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Mon, 25 Feb 2019 09:55:54 +0800 Subject: [PATCH] Fix the incompatibility issue of 4.x client in broker --- .../rocketmq/broker/processor/SendMessageProcessor.java | 7 ++++++- .../java/org/apache/rocketmq/snode/SnodeController.java | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index e5bd8e7f..7dd907f4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -82,7 +82,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; - SocketAddress bornHost = RemotingHelper.string2SocketAddress(requestHeader.getBornHost()); + SocketAddress bornHost = null; + if (requestHeader.getBornHost() != null) { + bornHost = RemotingHelper.string2SocketAddress(requestHeader.getBornHost()); + } else { + bornHost = ctx.channel().remoteAddress(); + } if (requestHeader.isBatch()) { response = this.sendBatchMessage(bornHost, request, mqtraceContext, requestHeader); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 6911d9a8..1d6397c0 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -326,8 +326,9 @@ public class SnodeController { this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor); this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor); - } public void start() { -- GitLab