提交 86c9f100 编写于 作者: D dongeforever

Fix code to RpcRequest

上级 4070da8a
...@@ -646,9 +646,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -646,9 +646,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
requestHeader.setTimestamp(timestamp); requestHeader.setTimestamp(timestamp);
requestHeader.setQueueId(item.getQueueId()); requestHeader.setQueueId(item.getQueueId());
requestHeader.setCode(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP);
requestHeader.setBname(item.getBname()); requestHeader.setBname(item.getBname());
RpcRequest rpcRequest = new RpcRequest(requestHeader, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
throw rpcResponse.getException(); throw rpcResponse.getException();
...@@ -753,11 +752,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -753,11 +752,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}; };
try { try {
requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
//TODO check if it is leader //TODO check if it is leader
RpcRequest rpcRequest = new RpcRequest(requestHeader, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
throw rpcResponse.getException(); throw rpcResponse.getException();
...@@ -808,10 +806,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -808,10 +806,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}; };
try { try {
requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
RpcRequest rpcRequest = new RpcRequest(requestHeader, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
//TODO check if it is leader //TODO check if it is leader
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
......
...@@ -145,8 +145,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -145,8 +145,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
requestHeader.setBname(bname); requestHeader.setBname(bname);
requestHeader.setCode(RequestCode.PULL_MESSAGE); RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null);
RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
throw rpcResponse.getException(); throw rpcResponse.getException();
......
...@@ -20,12 +20,11 @@ ...@@ -20,12 +20,11 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetEarliestMsgStoretimeResponseHeader extends RpcHeader { public class GetEarliestMsgStoretimeResponseHeader implements CommandCustomHeader {
@CFNotNull @CFNotNull
private Long timestamp; private Long timestamp;
......
...@@ -20,12 +20,11 @@ ...@@ -20,12 +20,11 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetMinOffsetResponseHeader extends RpcHeader { public class GetMinOffsetResponseHeader implements CommandCustomHeader {
@CFNotNull @CFNotNull
private Long offset; private Long offset;
......
...@@ -20,13 +20,12 @@ ...@@ -20,13 +20,12 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class PullMessageResponseHeader extends RpcHeader { public class PullMessageResponseHeader implements CommandCustomHeader {
@CFNotNull @CFNotNull
private Long suggestWhichBrokerId; private Long suggestWhichBrokerId;
@CFNotNull @CFNotNull
......
...@@ -20,12 +20,11 @@ ...@@ -20,12 +20,11 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class SearchOffsetResponseHeader extends RpcHeader { public class SearchOffsetResponseHeader implements CommandCustomHeader {
@CFNotNull @CFNotNull
private Long offset; private Long offset;
......
...@@ -25,7 +25,6 @@ public class RequestBuilder { ...@@ -25,7 +25,6 @@ public class RequestBuilder {
} }
try { try {
RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance(); RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance();
requestHeader.setCode(requestCode);
requestHeader.setOneway(oneway); requestHeader.setOneway(oneway);
requestHeader.setBname(destBrokerName); requestHeader.setBname(destBrokerName);
return requestHeader; return requestHeader;
...@@ -53,7 +52,6 @@ public class RequestBuilder { ...@@ -53,7 +52,6 @@ public class RequestBuilder {
} }
try { try {
TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance(); TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance();
requestHeader.setCode(requestCode);
requestHeader.setOneway(oneway); requestHeader.setOneway(oneway);
requestHeader.setBname(destBrokerName); requestHeader.setBname(destBrokerName);
requestHeader.setTopic(topic); requestHeader.setTopic(topic);
......
...@@ -61,12 +61,12 @@ public class RpcClientImpl implements RpcClient { ...@@ -61,12 +61,12 @@ public class RpcClientImpl implements RpcClient {
String addr = getBrokerAddrByNameOrException(request.getHeader().bname); String addr = getBrokerAddrByNameOrException(request.getHeader().bname);
Promise<RpcResponse> rpcResponsePromise = null; Promise<RpcResponse> rpcResponsePromise = null;
try { try {
switch (request.getHeader().getCode()) { switch (request.getCode()) {
case RequestCode.PULL_MESSAGE: case RequestCode.PULL_MESSAGE:
rpcResponsePromise = handlePullMessage(addr, request, timeoutMs); rpcResponsePromise = handlePullMessage(addr, request, timeoutMs);
break; break;
default: default:
throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getHeader().getCode()); throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
} }
} catch (RpcException rpcException) { } catch (RpcException rpcException) {
throw rpcException; throw rpcException;
...@@ -128,8 +128,7 @@ public class RpcClientImpl implements RpcClient { ...@@ -128,8 +128,7 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.PULL_OFFSET_MOVED: case ResponseCode.PULL_OFFSET_MOVED:
PullMessageResponseHeader responseHeader = PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class); (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
responseHeader.setCode(responseCommand.getCode()); rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
rpcResponsePromise.setSuccess(new RpcResponse(responseHeader, responseCommand.getBody()));
default: default:
RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code")); RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code"));
rpcResponsePromise.setSuccess(rpcResponse); rpcResponsePromise.setSuccess(rpcResponse);
...@@ -156,8 +155,7 @@ public class RpcClientImpl implements RpcClient { ...@@ -156,8 +155,7 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader = SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
responseHeader.setCode(responseCommand.getCode()); return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
return new RpcResponse(responseHeader, responseCommand.getBody());
} }
default:{ default:{
RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
...@@ -177,8 +175,7 @@ public class RpcClientImpl implements RpcClient { ...@@ -177,8 +175,7 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader = GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
responseHeader.setCode(responseCommand.getCode()); return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
return new RpcResponse(responseHeader, responseCommand.getBody());
} }
default:{ default:{
RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
...@@ -198,8 +195,7 @@ public class RpcClientImpl implements RpcClient { ...@@ -198,8 +195,7 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
GetEarliestMsgStoretimeResponseHeader responseHeader = GetEarliestMsgStoretimeResponseHeader responseHeader =
(GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
responseHeader.setCode(responseCommand.getCode()); return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
return new RpcResponse(responseHeader, responseCommand.getBody());
} }
default:{ default:{
......
...@@ -8,13 +8,13 @@ import java.nio.ByteBuffer; ...@@ -8,13 +8,13 @@ import java.nio.ByteBuffer;
public class RpcClientUtils { public class RpcClientUtils {
public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) { public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getHeader().getCode(), rpcRequest.getHeader()); RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getCode(), rpcRequest.getHeader());
cmd.setBody(encodeBody(rpcRequest.getBody())); cmd.setBody(encodeBody(rpcRequest.getBody()));
return cmd; return cmd;
} }
public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) { public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
RemotingCommand cmd = RemotingCommand.createResponseCommandWithHeader(rpcResponse.getHeader().getCode(), rpcResponse.getHeader()); RemotingCommand cmd = RemotingCommand.createResponseCommandWithHeader(rpcResponse.getCode(), rpcResponse.getHeader());
cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage()); cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
cmd.setBody(encodeBody(rpcResponse.getBody())); cmd.setBody(encodeBody(rpcResponse.getBody()));
return cmd; return cmd;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class RpcHeader implements CommandCustomHeader {
protected int code;
public RpcHeader() {
}
public RpcHeader(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
@Override
public void checkFields() throws RemotingCommandException {
}
}
...@@ -17,10 +17,12 @@ ...@@ -17,10 +17,12 @@
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
public class RpcRequest { public class RpcRequest {
int code;
private RpcRequestHeader header; private RpcRequestHeader header;
private Object body; private Object body;
public RpcRequest(RpcRequestHeader header, Object body) { public RpcRequest(int code, RpcRequestHeader header, Object body) {
this.code = code;
this.header = header; this.header = header;
this.body = body; this.body = body;
} }
...@@ -32,4 +34,8 @@ public class RpcRequest { ...@@ -32,4 +34,8 @@ public class RpcRequest {
public Object getBody() { public Object getBody() {
return body; return body;
} }
public int getCode() {
return code;
}
} }
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
*/ */
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
public abstract class RpcRequestHeader extends RpcHeader { import org.apache.rocketmq.remoting.CommandCustomHeader;
public abstract class RpcRequestHeader implements CommandCustomHeader {
//the namespace name //the namespace name
protected String namespace; protected String namespace;
//if the data has been namespaced //if the data has been namespaced
......
...@@ -16,8 +16,11 @@ ...@@ -16,8 +16,11 @@
*/ */
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.CommandCustomHeader;
public class RpcResponse { public class RpcResponse {
private RpcHeader header; private int code;
private CommandCustomHeader header;
private Object body; private Object body;
public RpcException exception; public RpcException exception;
...@@ -25,21 +28,26 @@ public class RpcResponse { ...@@ -25,21 +28,26 @@ public class RpcResponse {
} }
public RpcResponse(RpcHeader header, byte[] body) { public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
this.code = code;
this.header = header; this.header = header;
this.body = body; this.body = body;
} }
public RpcResponse(RpcException rpcException) { RpcResponse(RpcException rpcException) {
this.header = new RpcHeader(rpcException.getErrorCode()); this.code = rpcException.getErrorCode();
this.exception = rpcException; this.exception = rpcException;
} }
public RpcHeader getHeader() { public int getCode() {
return code;
}
public CommandCustomHeader getHeader() {
return header; return header;
} }
public void setHeader(RpcHeader header) { public void setHeader(CommandCustomHeader header) {
this.header = header; this.header = header;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册