提交 8d49c16d 编写于 作者: D dongeforever

Polish the rpc usage for PullProcessor

上级 311d5f4a
...@@ -474,29 +474,17 @@ public class BrokerOuterAPI { ...@@ -474,29 +474,17 @@ public class BrokerOuterAPI {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback); this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
} }
public RemotingCommand pullMessage(String bname, PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception { private String getBrokerAddrByNameOrException(String bname) throws MQBrokerException {
String addr = this.brokerController.getBrokerAddrByName(bname); String addr = this.brokerController.getBrokerAddrByName(bname);
if (addr == null) { if (addr == null) {
return RemotingCommand.buildErrorResponse(ResponseCode.SYSTEM_ERROR, throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr);
String.format("%s-%d cannot find addr when forward to broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), bname, currBrokerName),
PullMessageResponseHeader.class);
} }
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); return addr;
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return response;
} }
public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = this.brokerController.getBrokerAddrByName(bname); String addr = getBrokerAddrByNameOrException(bname);
if (addr == null) { RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
return rpcResponse;
}
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest );
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null; assert responseCommand != null;
...@@ -516,13 +504,8 @@ public class BrokerOuterAPI { ...@@ -516,13 +504,8 @@ public class BrokerOuterAPI {
} }
public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = this.brokerController.getBrokerAddrByName(bname); String addr = getBrokerAddrByNameOrException(bname);
if (addr == null) { RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
return rpcResponse;
}
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null; assert responseCommand != null;
switch (responseCommand.getCode()) { switch (responseCommand.getCode()) {
...@@ -540,14 +523,9 @@ public class BrokerOuterAPI { ...@@ -540,14 +523,9 @@ public class BrokerOuterAPI {
} }
public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = this.brokerController.getBrokerAddrByName(bname); String addr = getBrokerAddrByNameOrException(bname);
if (addr == null) {
RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
return rpcResponse;
}
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest); RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null; assert responseCommand != null;
...@@ -566,14 +544,9 @@ public class BrokerOuterAPI { ...@@ -566,14 +544,9 @@ public class BrokerOuterAPI {
} }
public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = this.brokerController.getBrokerAddrByName(bname); String addr = getBrokerAddrByNameOrException(bname);
if (addr == null) {
RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
return rpcResponse;
}
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest); RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null; assert responseCommand != null;
......
...@@ -51,6 +51,7 @@ import org.apache.rocketmq.common.help.FAQUrl; ...@@ -51,6 +51,7 @@ import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
...@@ -65,6 +66,8 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag; ...@@ -65,6 +66,8 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RpcRequest;
import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
...@@ -141,27 +144,20 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -141,27 +144,20 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
} }
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
RemotingCommand response = this.brokerController.getBrokerOuterAPI().pullMessage(bname, requestHeader, this.brokerController.getBrokerConfig().getForwardTimeout()); RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null);
switch (response.getCode()) { RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(bname, rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
case ResponseCode.SYSTEM_ERROR: if (rpcResponse.getException() != null) {
return response; throw rpcResponse.getException();
case ResponseCode.SUCCESS:
case ResponseCode.PULL_NOT_FOUND:
case ResponseCode.PULL_RETRY_IMMEDIATELY:
case ResponseCode.PULL_OFFSET_MOVED:
break;
default:
throw new MQBrokerException(response.getCode(), response.getRemark(), mappingItem.getBname());
} }
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) rpcResponse.getHeader();
{ {
RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext); RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
return rewriteResult; return rewriteResult;
} }
} }
return response; return RemotingCommand.createCommandForRpcResponse(rpcResponse);
} catch (Throwable t) { } catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
} }
......
...@@ -17,21 +17,22 @@ ...@@ -17,21 +17,22 @@
package org.apache.rocketmq.remoting.protocol; package org.apache.rocketmq.remoting.protocol;
import com.alibaba.fastjson.annotation.JSONField; import com.alibaba.fastjson.annotation.JSONField;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RpcRequest;
import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RpcRequest;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class RemotingCommand { public class RemotingCommand {
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type"; public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
...@@ -87,7 +88,15 @@ public class RemotingCommand { ...@@ -87,7 +88,15 @@ public class RemotingCommand {
protected RemotingCommand() { protected RemotingCommand() {
} }
public static RemotingCommand createRequestCommand(RpcRequest rpcRequest) { public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.customHeader = customHeader;
setCmdVersion(cmd);
return cmd;
}
public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
RemotingCommand cmd = new RemotingCommand(); RemotingCommand cmd = new RemotingCommand();
cmd.setCode(rpcRequest.getCode()); cmd.setCode(rpcRequest.getCode());
cmd.customHeader = rpcRequest.getHeader(); cmd.customHeader = rpcRequest.getHeader();
...@@ -96,10 +105,11 @@ public class RemotingCommand { ...@@ -96,10 +105,11 @@ public class RemotingCommand {
return cmd; return cmd;
} }
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
RemotingCommand cmd = new RemotingCommand(); RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code); cmd.markResponseType();
cmd.customHeader = customHeader; cmd.setCode(rpcResponse.getCode());
cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
setCmdVersion(cmd); setCmdVersion(cmd);
return cmd; return cmd;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册