提交 da12c9ed 编写于 作者: D dongeforever

Correct the invoke for PullMessageProcessor

上级 077cf09d
...@@ -54,6 +54,9 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestH ...@@ -54,6 +54,9 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestH
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.rpc.RpcClient;
import org.apache.rocketmq.common.rpc.RpcClientImpl;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
...@@ -81,15 +84,21 @@ public class BrokerOuterAPI { ...@@ -81,15 +84,21 @@ public class BrokerOuterAPI {
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true)); new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
private ClientMetadata clientMetadata;
private RpcClient rpcClient;
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, final BrokerController brokerController) { public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, final BrokerController brokerController) {
this(nettyClientConfig, null, brokerController);
this(nettyClientConfig, null, brokerController, new ClientMetadata());
} }
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook, final BrokerController brokerController) { private BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook, final BrokerController brokerController, ClientMetadata clientMetadata) {
this.remotingClient = new NettyRemotingClient(nettyClientConfig); this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.clientMetadata = clientMetadata;
this.remotingClient.registerRPCHook(rpcHook); this.remotingClient.registerRPCHook(rpcHook);
this.brokerController = brokerController; this.brokerController = brokerController;
this.currBrokerName = brokerController.getBrokerConfig().getBrokerName(); this.currBrokerName = brokerController.getBrokerConfig().getBrokerName();
this.rpcClient = new RpcClientImpl(this.clientMetadata, this.remotingClient);
} }
public void start() { public void start() {
...@@ -468,5 +477,11 @@ public class BrokerOuterAPI { ...@@ -468,5 +477,11 @@ public class BrokerOuterAPI {
} }
public ClientMetadata getClientMetadata() {
return clientMetadata;
}
public RpcClient getRpcClient() {
return rpcClient;
}
} }
...@@ -144,8 +144,10 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -144,8 +144,10 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
} }
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null); requestHeader.setBname(bname);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(bname, rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); requestHeader.setCode(RequestCode.PULL_MESSAGE);
RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
throw rpcResponse.getException(); throw rpcResponse.getException();
} }
......
...@@ -24,6 +24,8 @@ public abstract class CommonRpcHeader implements CommandCustomHeader { ...@@ -24,6 +24,8 @@ public abstract class CommonRpcHeader implements CommandCustomHeader {
//if the data has been namespaced //if the data has been namespaced
protected Boolean namespaced; protected Boolean namespaced;
protected int code;
//the abstract remote addr name, usually the physical broker name //the abstract remote addr name, usually the physical broker name
protected String bname; protected String bname;
...@@ -60,4 +62,12 @@ public abstract class CommonRpcHeader implements CommandCustomHeader { ...@@ -60,4 +62,12 @@ public abstract class CommonRpcHeader implements CommandCustomHeader {
public void setNamespaced(Boolean namespaced) { public void setNamespaced(Boolean namespaced) {
this.namespaced = namespaced; this.namespaced = namespaced;
} }
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
} }
...@@ -25,6 +25,7 @@ public class RequestBuilder { ...@@ -25,6 +25,7 @@ public class RequestBuilder {
} }
try { try {
CommonRpcHeader requestHeader = (CommonRpcHeader) requestHeaderClass.newInstance(); CommonRpcHeader requestHeader = (CommonRpcHeader) requestHeaderClass.newInstance();
requestHeader.setCode(requestCode);
requestHeader.setOneway(oneway); requestHeader.setOneway(oneway);
requestHeader.setBname(destBrokerName); requestHeader.setBname(destBrokerName);
return requestHeader; return requestHeader;
...@@ -52,6 +53,7 @@ public class RequestBuilder { ...@@ -52,6 +53,7 @@ public class RequestBuilder {
} }
try { try {
TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance(); TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance();
requestHeader.setCode(requestCode);
requestHeader.setOneway(oneway); requestHeader.setOneway(oneway);
requestHeader.setBname(destBrokerName); requestHeader.setBname(destBrokerName);
requestHeader.setTopic(topic); requestHeader.setTopic(topic);
......
...@@ -17,20 +17,14 @@ ...@@ -17,20 +17,14 @@
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
public class RpcRequest { public class RpcRequest {
private int code;
private CommonRpcHeader header; private CommonRpcHeader header;
private Object body; private Object body;
public RpcRequest(int code, CommonRpcHeader header, Object body) { public RpcRequest(CommonRpcHeader header, Object body) {
this.code = code;
this.header = header; this.header = header;
this.body = body; this.body = body;
} }
public int getCode() {
return code;
}
public CommonRpcHeader getHeader() { public CommonRpcHeader getHeader() {
return header; return header;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册