diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 0a8f58cfaa9699595633a96389349b99b14b59d3..3584316039064b087298a7e1592aedaf60ee1229 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -646,9 +646,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements requestHeader.setPhysical(true); requestHeader.setTimestamp(timestamp); requestHeader.setQueueId(item.getQueueId()); - requestHeader.setCode(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP); requestHeader.setBname(item.getBname()); - RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); @@ -753,11 +752,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); }; try { - requestHeader.setCode(RequestCode.GET_MIN_OFFSET); requestHeader.setBname(mappingItem.getBname()); requestHeader.setPhysical(true); //TODO check if it is leader - RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); @@ -808,10 +806,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); }; try { - requestHeader.setCode(RequestCode.GET_MIN_OFFSET); requestHeader.setBname(mappingItem.getBname()); requestHeader.setPhysical(true); - RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); //TODO check if it is leader RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 0cd4668ea75a03a7fada409c4d57bfe9ccbbcc03..df89889c985c5f06092a0ca624d61b56e47ee039 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -145,8 +145,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements requestHeader.setPhysical(true); requestHeader.setBname(bname); - requestHeader.setCode(RequestCode.PULL_MESSAGE); - RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java index 10e7d822c5172865a510bbb319051ba690afd64d..6b9b3b21d20664b3ac67a22c2e991681419ed7aa 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java @@ -20,12 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetEarliestMsgStoretimeResponseHeader extends RpcHeader { +public class GetEarliestMsgStoretimeResponseHeader implements CommandCustomHeader { @CFNotNull private Long timestamp; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java index 57cb0505e0a85d9543ea93d6e63664431e23c2e3..6fc0fac3b0594afde717cfb6cb2c5a3eca5b81d2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java @@ -20,12 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetMinOffsetResponseHeader extends RpcHeader { +public class GetMinOffsetResponseHeader implements CommandCustomHeader { @CFNotNull private Long offset; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java index e2f896b739d20b0f1a001f2239899d528e87fd61..88af984df816947e2d7fc77226da29887581480a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java @@ -20,13 +20,12 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class PullMessageResponseHeader extends RpcHeader { +public class PullMessageResponseHeader implements CommandCustomHeader { @CFNotNull private Long suggestWhichBrokerId; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java index 416f7f73425bbe6044afd08a9bfbe24b2ade222b..f88ac6852f41ada7e3e32f333736ea14ed644ba6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java @@ -20,12 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class SearchOffsetResponseHeader extends RpcHeader { +public class SearchOffsetResponseHeader implements CommandCustomHeader { @CFNotNull private Long offset; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java index a791543eb4af532d39797700487e0f3476b631b7..4b5c62b8009303293d4af3c085d8c3f39774bbd1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java @@ -25,7 +25,6 @@ public class RequestBuilder { } try { RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance(); - requestHeader.setCode(requestCode); requestHeader.setOneway(oneway); requestHeader.setBname(destBrokerName); return requestHeader; @@ -53,7 +52,6 @@ public class RequestBuilder { } try { TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance(); - requestHeader.setCode(requestCode); requestHeader.setOneway(oneway); requestHeader.setBname(destBrokerName); requestHeader.setTopic(topic); diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 8d557a19a6dd2568b52d896afdd2e48f5698ed27..245cc3a3930baad4a1dbcdde7d10418d58bf99c3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -61,12 +61,12 @@ public class RpcClientImpl implements RpcClient { String addr = getBrokerAddrByNameOrException(request.getHeader().bname); Promise rpcResponsePromise = null; try { - switch (request.getHeader().getCode()) { + switch (request.getCode()) { case RequestCode.PULL_MESSAGE: rpcResponsePromise = handlePullMessage(addr, request, timeoutMs); break; default: - throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getHeader().getCode()); + throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); } } catch (RpcException rpcException) { throw rpcException; @@ -128,8 +128,7 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.PULL_OFFSET_MOVED: PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class); - responseHeader.setCode(responseCommand.getCode()); - rpcResponsePromise.setSuccess(new RpcResponse(responseHeader, responseCommand.getBody())); + rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); default: RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code")); rpcResponsePromise.setSuccess(rpcResponse); @@ -156,8 +155,7 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); - responseHeader.setCode(responseCommand.getCode()); - return new RpcResponse(responseHeader, responseCommand.getBody()); + return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); } default:{ RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); @@ -177,8 +175,7 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); - responseHeader.setCode(responseCommand.getCode()); - return new RpcResponse(responseHeader, responseCommand.getBody()); + return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); } default:{ RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); @@ -198,8 +195,7 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); - responseHeader.setCode(responseCommand.getCode()); - return new RpcResponse(responseHeader, responseCommand.getBody()); + return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); } default:{ diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java index df14c225c7a41bc415444e4056b62e3f783db105..ebae12b519776f9100ead388a19eed5f1b0e0a0a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java @@ -8,13 +8,13 @@ import java.nio.ByteBuffer; public class RpcClientUtils { public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) { - RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getHeader().getCode(), rpcRequest.getHeader()); + RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getCode(), rpcRequest.getHeader()); cmd.setBody(encodeBody(rpcRequest.getBody())); return cmd; } public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) { - RemotingCommand cmd = RemotingCommand.createResponseCommandWithHeader(rpcResponse.getHeader().getCode(), rpcResponse.getHeader()); + RemotingCommand cmd = RemotingCommand.createResponseCommandWithHeader(rpcResponse.getCode(), rpcResponse.getHeader()); cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage()); cmd.setBody(encodeBody(rpcResponse.getBody())); return cmd; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java deleted file mode 100644 index 18b64d56e512c8e30cd50fbd124f46fe147f33ae..0000000000000000000000000000000000000000 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.common.rpc; - -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -public class RpcHeader implements CommandCustomHeader { - - - protected int code; - - public RpcHeader() { - } - - public RpcHeader(int code) { - this.code = code; - } - - public int getCode() { - return code; - } - - public void setCode(int code) { - this.code = code; - } - - @Override - public void checkFields() throws RemotingCommandException { - - } -} diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java index 3ebe3fe29c1e600e82f7cffa31133f115f21da97..90bb696e37960855f9c767aac7c17f93a94f86c0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java @@ -17,10 +17,12 @@ package org.apache.rocketmq.common.rpc; public class RpcRequest { + int code; private RpcRequestHeader header; private Object body; - public RpcRequest(RpcRequestHeader header, Object body) { + public RpcRequest(int code, RpcRequestHeader header, Object body) { + this.code = code; this.header = header; this.body = body; } @@ -32,4 +34,8 @@ public class RpcRequest { public Object getBody() { return body; } + + public int getCode() { + return code; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java index c5c748d5c5c7f53b1805abe82b159064eee4fa3b..577865ecd50be7e972ef5ef4a36e3566ffdf9efb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java @@ -16,7 +16,9 @@ */ package org.apache.rocketmq.common.rpc; -public abstract class RpcRequestHeader extends RpcHeader { +import org.apache.rocketmq.remoting.CommandCustomHeader; + +public abstract class RpcRequestHeader implements CommandCustomHeader { //the namespace name protected String namespace; //if the data has been namespaced diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java index be7cf9b23bdaa34970a5a27dacdda5f1e16f9dd1..5fcde36a916e6ef622c125da79e56fabcde289f7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java @@ -16,8 +16,11 @@ */ package org.apache.rocketmq.common.rpc; +import org.apache.rocketmq.remoting.CommandCustomHeader; + public class RpcResponse { - private RpcHeader header; + private int code; + private CommandCustomHeader header; private Object body; public RpcException exception; @@ -25,21 +28,26 @@ public class RpcResponse { } - public RpcResponse(RpcHeader header, byte[] body) { + public RpcResponse(int code, CommandCustomHeader header, byte[] body) { + this.code = code; this.header = header; this.body = body; } - public RpcResponse(RpcException rpcException) { - this.header = new RpcHeader(rpcException.getErrorCode()); + RpcResponse(RpcException rpcException) { + this.code = rpcException.getErrorCode(); this.exception = rpcException; } - public RpcHeader getHeader() { + public int getCode() { + return code; + } + + public CommandCustomHeader getHeader() { return header; } - public void setHeader(RpcHeader header) { + public void setHeader(CommandCustomHeader header) { this.header = header; }