diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index cd8a179e3130a7c008d7914f112a3a4e7c0cce11..384eef0741867890491f891a0182a60957a57819 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -54,6 +54,9 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestH import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.common.rpc.RpcClient; +import org.apache.rocketmq.common.rpc.RpcClientImpl; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.InvokeCallback; @@ -81,15 +84,21 @@ public class BrokerOuterAPI { private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(32), new ThreadFactoryImpl("brokerOutApi_thread_", true)); + private ClientMetadata clientMetadata; + private RpcClient rpcClient; + public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, final BrokerController brokerController) { - this(nettyClientConfig, null, brokerController); + + this(nettyClientConfig, null, brokerController, new ClientMetadata()); } - public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook, final BrokerController brokerController) { + private BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook, final BrokerController brokerController, ClientMetadata clientMetadata) { this.remotingClient = new NettyRemotingClient(nettyClientConfig); + this.clientMetadata = clientMetadata; this.remotingClient.registerRPCHook(rpcHook); this.brokerController = brokerController; this.currBrokerName = brokerController.getBrokerConfig().getBrokerName(); + this.rpcClient = new RpcClientImpl(this.clientMetadata, this.remotingClient); } public void start() { @@ -468,5 +477,11 @@ public class BrokerOuterAPI { } + public ClientMetadata getClientMetadata() { + return clientMetadata; + } + public RpcClient getRpcClient() { + return rpcClient; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index d8cd1792b557e908e518c3cc2ce972c940fb2c45..0cd4668ea75a03a7fada409c4d57bfe9ccbbcc03 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -144,8 +144,10 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements } requestHeader.setPhysical(true); - RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null); - RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(bname, rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); + requestHeader.setBname(bname); + requestHeader.setCode(RequestCode.PULL_MESSAGE); + RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java index bf58a5774744e38a098d301ea22a57cc6f86ce09..df62d52f590a464c36fa3166f71b4a66a964c934 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java @@ -24,6 +24,8 @@ public abstract class CommonRpcHeader implements CommandCustomHeader { //if the data has been namespaced protected Boolean namespaced; + protected int code; + //the abstract remote addr name, usually the physical broker name protected String bname; @@ -60,4 +62,12 @@ public abstract class CommonRpcHeader implements CommandCustomHeader { public void setNamespaced(Boolean namespaced) { this.namespaced = namespaced; } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java index aa69b17a0b2ff63b7d796053c387637ff7d1fc3f..7655871dd6bb20c223bfafa050b2eda2c75aac49 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java @@ -25,6 +25,7 @@ public class RequestBuilder { } try { CommonRpcHeader requestHeader = (CommonRpcHeader) requestHeaderClass.newInstance(); + requestHeader.setCode(requestCode); requestHeader.setOneway(oneway); requestHeader.setBname(destBrokerName); return requestHeader; @@ -52,6 +53,7 @@ public class RequestBuilder { } try { TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance(); + requestHeader.setCode(requestCode); requestHeader.setOneway(oneway); requestHeader.setBname(destBrokerName); requestHeader.setTopic(topic); diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java index e1cf010f4727a4d3730549ca75fa2f67cb4aa9ec..06d5c79f7717c880472945cc7e26537b9a7d6952 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java @@ -17,20 +17,14 @@ package org.apache.rocketmq.common.rpc; public class RpcRequest { - private int code; private CommonRpcHeader header; private Object body; - public RpcRequest(int code, CommonRpcHeader header, Object body) { - this.code = code; + public RpcRequest(CommonRpcHeader header, Object body) { this.header = header; this.body = body; } - public int getCode() { - return code; - } - public CommonRpcHeader getHeader() { return header; }