提交 6381e6e1 编写于 作者: D dongeforever

Rename the rpc header

上级 86b3ff7b
...@@ -20,11 +20,12 @@ ...@@ -20,11 +20,12 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetEarliestMsgStoretimeResponseHeader implements CommandCustomHeader { public class GetEarliestMsgStoretimeResponseHeader extends RpcHeader {
@CFNotNull @CFNotNull
private Long timestamp; private Long timestamp;
......
...@@ -20,11 +20,12 @@ ...@@ -20,11 +20,12 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetMinOffsetResponseHeader implements CommandCustomHeader { public class GetMinOffsetResponseHeader extends RpcHeader {
@CFNotNull @CFNotNull
private Long offset; private Long offset;
......
...@@ -20,12 +20,13 @@ ...@@ -20,12 +20,13 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class PullMessageResponseHeader implements CommandCustomHeader { public class PullMessageResponseHeader extends RpcHeader {
@CFNotNull @CFNotNull
private Long suggestWhichBrokerId; private Long suggestWhichBrokerId;
@CFNotNull @CFNotNull
......
...@@ -20,11 +20,12 @@ ...@@ -20,11 +20,12 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class SearchOffsetResponseHeader implements CommandCustomHeader { public class SearchOffsetResponseHeader extends RpcHeader {
@CFNotNull @CFNotNull
private Long offset; private Long offset;
......
...@@ -14,17 +14,17 @@ public class RequestBuilder { ...@@ -14,17 +14,17 @@ public class RequestBuilder {
requestCodeMap.put(RequestCode.PULL_MESSAGE, PullMessageRequestHeader.class); requestCodeMap.put(RequestCode.PULL_MESSAGE, PullMessageRequestHeader.class);
} }
public static CommonRpcHeader buildCommonRpcHeader(int requestCode, String destBrokerName) { public static RpcRequestHeader buildCommonRpcHeader(int requestCode, String destBrokerName) {
return buildCommonRpcHeader(requestCode, null, destBrokerName); return buildCommonRpcHeader(requestCode, null, destBrokerName);
} }
public static CommonRpcHeader buildCommonRpcHeader(int requestCode, Boolean oneway, String destBrokerName) { public static RpcRequestHeader buildCommonRpcHeader(int requestCode, Boolean oneway, String destBrokerName) {
Class requestHeaderClass = requestCodeMap.get(requestCode); Class requestHeaderClass = requestCodeMap.get(requestCode);
if (requestHeaderClass == null) { if (requestHeaderClass == null) {
throw new UnsupportedOperationException("unknown " + requestCode); throw new UnsupportedOperationException("unknown " + requestCode);
} }
try { try {
CommonRpcHeader requestHeader = (CommonRpcHeader) requestHeaderClass.newInstance(); RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance();
requestHeader.setCode(requestCode); requestHeader.setCode(requestCode);
requestHeader.setOneway(oneway); requestHeader.setOneway(oneway);
requestHeader.setBname(destBrokerName); requestHeader.setBname(destBrokerName);
......
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
import com.google.common.util.concurrent.Futures;
import com.sun.org.apache.xpath.internal.functions.FuncPosition;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
...@@ -20,10 +16,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -20,10 +16,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
public class RpcClientImpl implements RpcClient { public class RpcClientImpl implements RpcClient {
...@@ -68,12 +61,12 @@ public class RpcClientImpl implements RpcClient { ...@@ -68,12 +61,12 @@ public class RpcClientImpl implements RpcClient {
String addr = getBrokerAddrByNameOrException(request.getHeader().bname); String addr = getBrokerAddrByNameOrException(request.getHeader().bname);
Promise<RpcResponse> rpcResponsePromise = null; Promise<RpcResponse> rpcResponsePromise = null;
try { try {
switch (request.getCode()) { switch (request.getHeader().getCode()) {
case RequestCode.PULL_MESSAGE: case RequestCode.PULL_MESSAGE:
rpcResponsePromise = handlePullMessage(addr, request, timeoutMs); rpcResponsePromise = handlePullMessage(addr, request, timeoutMs);
break; break;
default: default:
throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getHeader().getCode());
} }
} catch (RpcException rpcException) { } catch (RpcException rpcException) {
throw rpcException; throw rpcException;
...@@ -135,7 +128,8 @@ public class RpcClientImpl implements RpcClient { ...@@ -135,7 +128,8 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.PULL_OFFSET_MOVED: case ResponseCode.PULL_OFFSET_MOVED:
PullMessageResponseHeader responseHeader = PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class); (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); responseHeader.setCode(responseCommand.getCode());
rpcResponsePromise.setSuccess(new RpcResponse(responseHeader, responseCommand.getBody()));
default: default:
RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code")); RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code"));
rpcResponsePromise.setSuccess(rpcResponse); rpcResponsePromise.setSuccess(rpcResponse);
...@@ -162,11 +156,11 @@ public class RpcClientImpl implements RpcClient { ...@@ -162,11 +156,11 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader = SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); responseHeader.setCode(responseCommand.getCode());
return new RpcResponse(responseHeader, responseCommand.getBody());
} }
default:{ default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null); RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
return rpcResponse; return rpcResponse;
} }
} }
...@@ -183,11 +177,11 @@ public class RpcClientImpl implements RpcClient { ...@@ -183,11 +177,11 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader = GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); responseHeader.setCode(responseCommand.getCode());
return new RpcResponse(responseHeader, responseCommand.getBody());
} }
default:{ default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null); RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
return rpcResponse; return rpcResponse;
} }
} }
...@@ -204,12 +198,12 @@ public class RpcClientImpl implements RpcClient { ...@@ -204,12 +198,12 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
GetEarliestMsgStoretimeResponseHeader responseHeader = GetEarliestMsgStoretimeResponseHeader responseHeader =
(GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); responseHeader.setCode(responseCommand.getCode());
return new RpcResponse(responseHeader, responseCommand.getBody());
} }
default:{ default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null); RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"));
rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
return rpcResponse; return rpcResponse;
} }
} }
......
...@@ -8,13 +8,13 @@ import java.nio.ByteBuffer; ...@@ -8,13 +8,13 @@ import java.nio.ByteBuffer;
public class RpcClientUtils { public class RpcClientUtils {
public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) { public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getCode(), rpcRequest.getHeader()); RemotingCommand cmd = RemotingCommand.createRequestCommandWithHeader(rpcRequest.getHeader().getCode(), rpcRequest.getHeader());
cmd.setBody(encodeBody(rpcRequest.getBody())); cmd.setBody(encodeBody(rpcRequest.getBody()));
return cmd; return cmd;
} }
public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) { public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
RemotingCommand cmd = RemotingCommand.createResponseCommand(rpcResponse.getCode(), rpcResponse.getHeader()); RemotingCommand cmd = RemotingCommand.createResponseCommandWithHeader(rpcResponse.getCode(), rpcResponse.getHeader());
cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage()); cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
cmd.setBody(encodeBody(rpcResponse.getBody())); cmd.setBody(encodeBody(rpcResponse.getBody()));
return cmd; return cmd;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class RpcHeader implements CommandCustomHeader {
protected int code;
public RpcHeader() {
}
public RpcHeader(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
@Override
public void checkFields() throws RemotingCommandException {
}
}
...@@ -17,15 +17,15 @@ ...@@ -17,15 +17,15 @@
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
public class RpcRequest { public class RpcRequest {
private CommonRpcHeader header; private RpcRequestHeader header;
private Object body; private Object body;
public RpcRequest(CommonRpcHeader header, Object body) { public RpcRequest(RpcRequestHeader header, Object body) {
this.header = header; this.header = header;
this.body = body; this.body = body;
} }
public CommonRpcHeader getHeader() { public RpcRequestHeader getHeader() {
return header; return header;
} }
......
...@@ -16,16 +16,11 @@ ...@@ -16,16 +16,11 @@
*/ */
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.CommandCustomHeader; public abstract class RpcRequestHeader extends RpcHeader {
public abstract class CommonRpcHeader implements CommandCustomHeader {
//the namespace name //the namespace name
protected String namespace; protected String namespace;
//if the data has been namespaced //if the data has been namespaced
protected Boolean namespaced; protected Boolean namespaced;
protected int code;
//the abstract remote addr name, usually the physical broker name //the abstract remote addr name, usually the physical broker name
protected String bname; protected String bname;
...@@ -62,12 +57,4 @@ public abstract class CommonRpcHeader implements CommandCustomHeader { ...@@ -62,12 +57,4 @@ public abstract class CommonRpcHeader implements CommandCustomHeader {
public void setNamespaced(Boolean namespaced) { public void setNamespaced(Boolean namespaced) {
this.namespaced = namespaced; this.namespaced = namespaced;
} }
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
} }
...@@ -16,11 +16,8 @@ ...@@ -16,11 +16,8 @@
*/ */
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.CommandCustomHeader;
public class RpcResponse { public class RpcResponse {
private int code; private RpcHeader header;
private CommandCustomHeader header;
private Object body; private Object body;
public RpcException exception; public RpcException exception;
...@@ -28,29 +25,32 @@ public class RpcResponse { ...@@ -28,29 +25,32 @@ public class RpcResponse {
} }
public RpcResponse(int code, CommandCustomHeader header, byte[] body) { public RpcResponse(RpcHeader header, byte[] body) {
this.code = code;
this.header = header; this.header = header;
this.body = body; this.body = body;
} }
public RpcResponse(RpcException rpcException) { public RpcResponse(RpcException rpcException) {
this.code = rpcException.getErrorCode(); this.header = new RpcHeader(rpcException.getErrorCode());
this.exception = rpcException; this.exception = rpcException;
} }
public int getCode() { public RpcHeader getHeader() {
return code; return header;
} }
public CommandCustomHeader getHeader() { public void setHeader(RpcHeader header) {
return header; this.header = header;
} }
public Object getBody() { public Object getBody() {
return body; return body;
} }
public void setBody(Object body) {
this.body = body;
}
public RpcException getException() { public RpcException getException() {
return exception; return exception;
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
*/ */
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
public abstract class TopicQueueRequestHeader extends CommonRpcHeader { public abstract class TopicQueueRequestHeader extends RpcRequestHeader {
//Physical or Logical //Physical or Logical
protected Boolean physical; protected Boolean physical;
......
...@@ -87,7 +87,7 @@ public class RemotingCommand { ...@@ -87,7 +87,7 @@ public class RemotingCommand {
} }
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { public static RemotingCommand createRequestCommandWithHeader(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand(); RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code); cmd.setCode(code);
cmd.customHeader = customHeader; cmd.customHeader = customHeader;
...@@ -95,7 +95,7 @@ public class RemotingCommand { ...@@ -95,7 +95,7 @@ public class RemotingCommand {
return cmd; return cmd;
} }
public static RemotingCommand createResponseCommand(int code, CommandCustomHeader customHeader) { public static RemotingCommand createResponseCommandWithHeader(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand(); RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code); cmd.setCode(code);
cmd.markResponseType(); cmd.markResponseType();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册