diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java index 6b9b3b21d20664b3ac67a22c2e991681419ed7aa..10e7d822c5172865a510bbb319051ba690afd64d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java @@ -20,11 +20,12 @@ */ package org.apache.rocketmq.common.protocol.header; +import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetEarliestMsgStoretimeResponseHeader implements CommandCustomHeader { +public class GetEarliestMsgStoretimeResponseHeader extends RpcHeader { @CFNotNull private Long timestamp; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java index 6fc0fac3b0594afde717cfb6cb2c5a3eca5b81d2..57cb0505e0a85d9543ea93d6e63664431e23c2e3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java @@ -20,11 +20,12 @@ */ package org.apache.rocketmq.common.protocol.header; +import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetMinOffsetResponseHeader implements CommandCustomHeader { +public class GetMinOffsetResponseHeader extends RpcHeader { @CFNotNull private Long offset; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java index 88af984df816947e2d7fc77226da29887581480a..e2f896b739d20b0f1a001f2239899d528e87fd61 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java @@ -20,12 +20,13 @@ */ package org.apache.rocketmq.common.protocol.header; +import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class PullMessageResponseHeader implements CommandCustomHeader { +public class PullMessageResponseHeader extends RpcHeader { @CFNotNull private Long suggestWhichBrokerId; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java index f88ac6852f41ada7e3e32f333736ea14ed644ba6..416f7f73425bbe6044afd08a9bfbe24b2ade222b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java @@ -20,11 +20,12 @@ */ package org.apache.rocketmq.common.protocol.header; +import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class SearchOffsetResponseHeader implements CommandCustomHeader { +public class SearchOffsetResponseHeader extends RpcHeader { @CFNotNull private Long offset; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java index 7655871dd6bb20c223bfafa050b2eda2c75aac49..a791543eb4af532d39797700487e0f3476b631b7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java @@ -14,17 +14,17 @@ public class RequestBuilder { requestCodeMap.put(RequestCode.PULL_MESSAGE, PullMessageRequestHeader.class); } - public static CommonRpcHeader buildCommonRpcHeader(int requestCode, String destBrokerName) { + public static RpcRequestHeader buildCommonRpcHeader(int requestCode, String destBrokerName) { return buildCommonRpcHeader(requestCode, null, destBrokerName); } - public static CommonRpcHeader buildCommonRpcHeader(int requestCode, Boolean oneway, String destBrokerName) { + public static RpcRequestHeader buildCommonRpcHeader(int requestCode, Boolean oneway, String destBrokerName) { Class requestHeaderClass = requestCodeMap.get(requestCode); if (requestHeaderClass == null) { throw new UnsupportedOperationException("unknown " + requestCode); } try { - CommonRpcHeader requestHeader = (CommonRpcHeader) requestHeaderClass.newInstance(); + RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance(); requestHeader.setCode(requestCode); requestHeader.setOneway(oneway); requestHeader.setBname(destBrokerName); diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 81d5d044300f36b8de5768608f5853f87e41b2c2..8d557a19a6dd2568b52d896afdd2e48f5698ed27 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -1,9 +1,5 @@ package org.apache.rocketmq.common.rpc; -import com.google.common.util.concurrent.Futures; -import com.sun.org.apache.xpath.internal.functions.FuncPosition; -import io.netty.util.concurrent.DefaultPromise; -import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import org.apache.rocketmq.common.message.MessageQueue; @@ -20,10 +16,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; public class RpcClientImpl implements RpcClient { @@ -68,12 +61,12 @@ public class RpcClientImpl implements RpcClient { String addr = getBrokerAddrByNameOrException(request.getHeader().bname); Promise rpcResponsePromise = null; try { - switch (request.getCode()) { + switch (request.getHeader().getCode()) { case RequestCode.PULL_MESSAGE: rpcResponsePromise = handlePullMessage(addr, request, timeoutMs); break; default: - throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); + throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getHeader().getCode()); } } catch (RpcException rpcException) { throw rpcException; @@ -135,7 +128,8 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.PULL_OFFSET_MOVED: PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class); - rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); + responseHeader.setCode(responseCommand.getCode()); + rpcResponsePromise.setSuccess(new RpcResponse(responseHeader, responseCommand.getBody())); default: RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code")); rpcResponsePromise.setSuccess(rpcResponse); @@ -162,11 +156,11 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); - return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); + responseHeader.setCode(responseCommand.getCode()); + return new RpcResponse(responseHeader, responseCommand.getBody()); } default:{ - RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null); - rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error")); + RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); return rpcResponse; } } @@ -183,11 +177,11 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); - return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); + responseHeader.setCode(responseCommand.getCode()); + return new RpcResponse(responseHeader, responseCommand.getBody()); } default:{ - RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null); - rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error")); + RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); return rpcResponse; } } @@ -204,12 +198,12 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); - return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); + responseHeader.setCode(responseCommand.getCode()); + return new RpcResponse(responseHeader, responseCommand.getBody()); } default:{ - RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null); - rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error")); + RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); return rpcResponse; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java index 3ae06b7c61eea9815400d7383a867b729f9528e3..654b2ed258d8d0accee0b61d3dc46f22b76781f5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java @@ -8,13 +8,13 @@ import java.nio.ByteBuffer; public class RpcClientUtils { public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) { - RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getCode(), rpcRequest.getHeader()); + RemotingCommand cmd = RemotingCommand.createRequestCommandWithHeader(rpcRequest.getHeader().getCode(), rpcRequest.getHeader()); cmd.setBody(encodeBody(rpcRequest.getBody())); return cmd; } public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) { - RemotingCommand cmd = RemotingCommand.createResponseCommand(rpcResponse.getCode(), rpcResponse.getHeader()); + RemotingCommand cmd = RemotingCommand.createResponseCommandWithHeader(rpcResponse.getCode(), rpcResponse.getHeader()); cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage()); cmd.setBody(encodeBody(rpcResponse.getBody())); return cmd; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..18b64d56e512c8e30cd50fbd124f46fe147f33ae --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.rpc; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class RpcHeader implements CommandCustomHeader { + + + protected int code; + + public RpcHeader() { + } + + public RpcHeader(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + @Override + public void checkFields() throws RemotingCommandException { + + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java index 06d5c79f7717c880472945cc7e26537b9a7d6952..3ebe3fe29c1e600e82f7cffa31133f115f21da97 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java @@ -17,15 +17,15 @@ package org.apache.rocketmq.common.rpc; public class RpcRequest { - private CommonRpcHeader header; + private RpcRequestHeader header; private Object body; - public RpcRequest(CommonRpcHeader header, Object body) { + public RpcRequest(RpcRequestHeader header, Object body) { this.header = header; this.body = body; } - public CommonRpcHeader getHeader() { + public RpcRequestHeader getHeader() { return header; } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java similarity index 85% rename from common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java rename to common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java index df62d52f590a464c36fa3166f71b4a66a964c934..c5c748d5c5c7f53b1805abe82b159064eee4fa3b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/CommonRpcHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java @@ -16,16 +16,11 @@ */ package org.apache.rocketmq.common.rpc; -import org.apache.rocketmq.remoting.CommandCustomHeader; - -public abstract class CommonRpcHeader implements CommandCustomHeader { +public abstract class RpcRequestHeader extends RpcHeader { //the namespace name protected String namespace; //if the data has been namespaced protected Boolean namespaced; - - protected int code; - //the abstract remote addr name, usually the physical broker name protected String bname; @@ -62,12 +57,4 @@ public abstract class CommonRpcHeader implements CommandCustomHeader { public void setNamespaced(Boolean namespaced) { this.namespaced = namespaced; } - - public int getCode() { - return code; - } - - public void setCode(int code) { - this.code = code; - } } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java index 601f942df1a2d583735477d6933637bb85fb9efa..be7cf9b23bdaa34970a5a27dacdda5f1e16f9dd1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java @@ -16,11 +16,8 @@ */ package org.apache.rocketmq.common.rpc; -import org.apache.rocketmq.remoting.CommandCustomHeader; - public class RpcResponse { - private int code; - private CommandCustomHeader header; + private RpcHeader header; private Object body; public RpcException exception; @@ -28,29 +25,32 @@ public class RpcResponse { } - public RpcResponse(int code, CommandCustomHeader header, byte[] body) { - this.code = code; + public RpcResponse(RpcHeader header, byte[] body) { this.header = header; this.body = body; } public RpcResponse(RpcException rpcException) { - this.code = rpcException.getErrorCode(); + this.header = new RpcHeader(rpcException.getErrorCode()); this.exception = rpcException; } - public int getCode() { - return code; + public RpcHeader getHeader() { + return header; } - public CommandCustomHeader getHeader() { - return header; + public void setHeader(RpcHeader header) { + this.header = header; } public Object getBody() { return body; } + public void setBody(Object body) { + this.body = body; + } + public RpcException getException() { return exception; } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java index 4b0a394360dc728f10b3016b9d7f8f2d46794095..5d0a151b151f5fe3586fb0b6ba873f7048b49c65 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.common.rpc; -public abstract class TopicQueueRequestHeader extends CommonRpcHeader { +public abstract class TopicQueueRequestHeader extends RpcRequestHeader { //Physical or Logical protected Boolean physical; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 65ffaf5f0739c86daa9a58a2ffeb07083854dd7a..08bb202c3324daa307dfde2ddcac780c96eafc26 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -87,7 +87,7 @@ public class RemotingCommand { } - public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { + public static RemotingCommand createRequestCommandWithHeader(int code, CommandCustomHeader customHeader) { RemotingCommand cmd = new RemotingCommand(); cmd.setCode(code); cmd.customHeader = customHeader; @@ -95,7 +95,7 @@ public class RemotingCommand { return cmd; } - public static RemotingCommand createResponseCommand(int code, CommandCustomHeader customHeader) { + public static RemotingCommand createResponseCommandWithHeader(int code, CommandCustomHeader customHeader) { RemotingCommand cmd = new RemotingCommand(); cmd.setCode(code); cmd.markResponseType();