提交 27296f1d 编写于 作者: D duhenglucky

Fix the incompatibility issue of 4.x client in broker

上级 923a7666
...@@ -82,7 +82,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -82,7 +82,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
this.executeSendMessageHookBefore(ctx, request, mqtraceContext); this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response; RemotingCommand response;
SocketAddress bornHost = RemotingHelper.string2SocketAddress(requestHeader.getBornHost()); SocketAddress bornHost = null;
if (requestHeader.getBornHost() != null) {
bornHost = RemotingHelper.string2SocketAddress(requestHeader.getBornHost());
} else {
bornHost = ctx.channel().remoteAddress();
}
if (requestHeader.isBatch()) { if (requestHeader.isBatch()) {
response = this.sendBatchMessage(bornHost, request, mqtraceContext, requestHeader); response = this.sendBatchMessage(bornHost, request, mqtraceContext, requestHeader);
......
...@@ -326,8 +326,9 @@ public class SnodeController { ...@@ -326,8 +326,9 @@ public class SnodeController {
this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManageProcessor, this.consumerManageExecutor);
this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor); this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor);
} }
public void start() { public void start() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册