From 8d49c16df87e93bdf1e7a25a27716c559f6e34e4 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Thu, 11 Nov 2021 16:18:08 +0800 Subject: [PATCH] Polish the rpc usage for PullProcessor --- .../rocketmq/broker/out/BrokerOuterAPI.java | 49 +++++-------------- .../processor/PullMessageProcessor.java | 24 ++++----- .../remoting/protocol/RemotingCommand.java | 34 ++++++++----- 3 files changed, 43 insertions(+), 64 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index c5f8b90a..5bb499ef 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -474,29 +474,17 @@ public class BrokerOuterAPI { this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback); } - public RemotingCommand pullMessage(String bname, PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception { - + private String getBrokerAddrByNameOrException(String bname) throws MQBrokerException { String addr = this.brokerController.getBrokerAddrByName(bname); if (addr == null) { - return RemotingCommand.buildErrorResponse(ResponseCode.SYSTEM_ERROR, - String.format("%s-%d cannot find addr when forward to broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), bname, currBrokerName), - PullMessageResponseHeader.class); + throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr); } - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); - RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); - assert response != null; - return response; - + return addr; } public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { - String addr = this.brokerController.getBrokerAddrByName(bname); - if (addr == null) { - RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null); - rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr)); - return rpcResponse; - } - RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest ); + String addr = getBrokerAddrByNameOrException(bname); + RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); assert responseCommand != null; @@ -516,13 +504,8 @@ public class BrokerOuterAPI { } public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { - String addr = this.brokerController.getBrokerAddrByName(bname); - if (addr == null) { - RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null); - rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr)); - return rpcResponse; - } - RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest); + String addr = getBrokerAddrByNameOrException(bname); + RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); assert responseCommand != null; switch (responseCommand.getCode()) { @@ -540,14 +523,9 @@ public class BrokerOuterAPI { } public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { - String addr = this.brokerController.getBrokerAddrByName(bname); - if (addr == null) { - RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null); - rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr)); - return rpcResponse; - } + String addr = getBrokerAddrByNameOrException(bname); - RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest); + RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); assert responseCommand != null; @@ -566,14 +544,9 @@ public class BrokerOuterAPI { } public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { - String addr = this.brokerController.getBrokerAddrByName(bname); - if (addr == null) { - RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null); - rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr)); - return rpcResponse; - } + String addr = getBrokerAddrByNameOrException(bname); - RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest); + RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); assert responseCommand != null; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 79869b98..3f2a6824 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -51,6 +51,7 @@ import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; @@ -65,6 +66,8 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.RpcRequest; +import org.apache.rocketmq.remoting.RpcResponse; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -141,27 +144,20 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements } requestHeader.setPhysical(true); - RemotingCommand response = this.brokerController.getBrokerOuterAPI().pullMessage(bname, requestHeader, this.brokerController.getBrokerConfig().getForwardTimeout()); - switch (response.getCode()) { - case ResponseCode.SYSTEM_ERROR: - return response; - case ResponseCode.SUCCESS: - case ResponseCode.PULL_NOT_FOUND: - case ResponseCode.PULL_RETRY_IMMEDIATELY: - case ResponseCode.PULL_OFFSET_MOVED: - break; - default: - throw new MQBrokerException(response.getCode(), response.getRemark(), mappingItem.getBname()); + RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(bname, rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); } - PullMessageResponseHeader responseHeader = - (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); + + PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) rpcResponse.getHeader(); { RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; } } - return response; + return RemotingCommand.createCommandForRpcResponse(rpcResponse); } catch (Throwable t) { return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 8fc3f9e3..e0611804 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -17,21 +17,22 @@ package org.apache.rocketmq.remoting.protocol; import com.alibaba.fastjson.annotation.JSONField; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.RpcRequest; +import org.apache.rocketmq.remoting.RpcResponse; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.lang.reflect.Type; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.RpcRequest; -import org.apache.rocketmq.remoting.annotation.CFNotNull; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; public class RemotingCommand { public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type"; @@ -87,7 +88,15 @@ public class RemotingCommand { protected RemotingCommand() { } - public static RemotingCommand createRequestCommand(RpcRequest rpcRequest) { + public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { + RemotingCommand cmd = new RemotingCommand(); + cmd.setCode(code); + cmd.customHeader = customHeader; + setCmdVersion(cmd); + return cmd; + } + + public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) { RemotingCommand cmd = new RemotingCommand(); cmd.setCode(rpcRequest.getCode()); cmd.customHeader = rpcRequest.getHeader(); @@ -96,10 +105,11 @@ public class RemotingCommand { return cmd; } - public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { + public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) { RemotingCommand cmd = new RemotingCommand(); - cmd.setCode(code); - cmd.customHeader = customHeader; + cmd.markResponseType(); + cmd.setCode(rpcResponse.getCode()); + cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage()); setCmdVersion(cmd); return cmd; } -- GitLab