diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index e5bd8e7f84fec2e66929e988d059651f31e7eb23..7dd907f478dd2a05ad8117009cfccf2030ba084e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -82,7 +82,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; - SocketAddress bornHost = RemotingHelper.string2SocketAddress(requestHeader.getBornHost()); + SocketAddress bornHost = null; + if (requestHeader.getBornHost() != null) { + bornHost = RemotingHelper.string2SocketAddress(requestHeader.getBornHost()); + } else { + bornHost = ctx.channel().remoteAddress(); + } if (requestHeader.isBatch()) { response = this.sendBatchMessage(bornHost, request, mqtraceContext, requestHeader); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 6911d9a889cb5fd6144a44e631fa6bd369918cc7..1d6397c032c3d008982ce2246eb40edb837f639e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -326,8 +326,9 @@ public class SnodeController { this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor); this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor); - } public void start() {