From 86c9f100cfedb040470a6b0aa69e4635323be96c Mon Sep 17 00:00:00 2001 From: dongeforever Date: Tue, 16 Nov 2021 21:56:07 +0800 Subject: [PATCH] Fix code to RpcRequest --- .../processor/AdminBrokerProcessor.java | 9 ++-- .../processor/PullMessageProcessor.java | 3 +- ...GetEarliestMsgStoretimeResponseHeader.java | 3 +- .../header/GetMinOffsetResponseHeader.java | 3 +- .../header/PullMessageResponseHeader.java | 3 +- .../header/SearchOffsetResponseHeader.java | 3 +- .../rocketmq/common/rpc/RequestBuilder.java | 2 - .../rocketmq/common/rpc/RpcClientImpl.java | 16 +++---- .../rocketmq/common/rpc/RpcClientUtils.java | 4 +- .../apache/rocketmq/common/rpc/RpcHeader.java | 46 ------------------- .../rocketmq/common/rpc/RpcRequest.java | 8 +++- .../rocketmq/common/rpc/RpcRequestHeader.java | 4 +- .../rocketmq/common/rpc/RpcResponse.java | 20 +++++--- 13 files changed, 40 insertions(+), 84 deletions(-) delete mode 100644 common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 0a8f58cf..35843160 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -646,9 +646,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements requestHeader.setPhysical(true); requestHeader.setTimestamp(timestamp); requestHeader.setQueueId(item.getQueueId()); - requestHeader.setCode(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP); requestHeader.setBname(item.getBname()); - RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); @@ -753,11 +752,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); }; try { - requestHeader.setCode(RequestCode.GET_MIN_OFFSET); requestHeader.setBname(mappingItem.getBname()); requestHeader.setPhysical(true); //TODO check if it is leader - RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); @@ -808,10 +806,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); }; try { - requestHeader.setCode(RequestCode.GET_MIN_OFFSET); requestHeader.setBname(mappingItem.getBname()); requestHeader.setPhysical(true); - RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); //TODO check if it is leader RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 0cd4668e..df89889c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -145,8 +145,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements requestHeader.setPhysical(true); requestHeader.setBname(bname); - requestHeader.setCode(RequestCode.PULL_MESSAGE); - RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java index 10e7d822..6b9b3b21 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java @@ -20,12 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetEarliestMsgStoretimeResponseHeader extends RpcHeader { +public class GetEarliestMsgStoretimeResponseHeader implements CommandCustomHeader { @CFNotNull private Long timestamp; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java index 57cb0505..6fc0fac3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java @@ -20,12 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetMinOffsetResponseHeader extends RpcHeader { +public class GetMinOffsetResponseHeader implements CommandCustomHeader { @CFNotNull private Long offset; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java index e2f896b7..88af984d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java @@ -20,13 +20,12 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class PullMessageResponseHeader extends RpcHeader { +public class PullMessageResponseHeader implements CommandCustomHeader { @CFNotNull private Long suggestWhichBrokerId; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java index 416f7f73..f88ac685 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java @@ -20,12 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.common.rpc.RpcHeader; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class SearchOffsetResponseHeader extends RpcHeader { +public class SearchOffsetResponseHeader implements CommandCustomHeader { @CFNotNull private Long offset; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java index a791543e..4b5c62b8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java @@ -25,7 +25,6 @@ public class RequestBuilder { } try { RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance(); - requestHeader.setCode(requestCode); requestHeader.setOneway(oneway); requestHeader.setBname(destBrokerName); return requestHeader; @@ -53,7 +52,6 @@ public class RequestBuilder { } try { TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance(); - requestHeader.setCode(requestCode); requestHeader.setOneway(oneway); requestHeader.setBname(destBrokerName); requestHeader.setTopic(topic); diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 8d557a19..245cc3a3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -61,12 +61,12 @@ public class RpcClientImpl implements RpcClient { String addr = getBrokerAddrByNameOrException(request.getHeader().bname); Promise rpcResponsePromise = null; try { - switch (request.getHeader().getCode()) { + switch (request.getCode()) { case RequestCode.PULL_MESSAGE: rpcResponsePromise = handlePullMessage(addr, request, timeoutMs); break; default: - throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getHeader().getCode()); + throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); } } catch (RpcException rpcException) { throw rpcException; @@ -128,8 +128,7 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.PULL_OFFSET_MOVED: PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class); - responseHeader.setCode(responseCommand.getCode()); - rpcResponsePromise.setSuccess(new RpcResponse(responseHeader, responseCommand.getBody())); + rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); default: RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code")); rpcResponsePromise.setSuccess(rpcResponse); @@ -156,8 +155,7 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); - responseHeader.setCode(responseCommand.getCode()); - return new RpcResponse(responseHeader, responseCommand.getBody()); + return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); } default:{ RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); @@ -177,8 +175,7 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); - responseHeader.setCode(responseCommand.getCode()); - return new RpcResponse(responseHeader, responseCommand.getBody()); + return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); } default:{ RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); @@ -198,8 +195,7 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); - responseHeader.setCode(responseCommand.getCode()); - return new RpcResponse(responseHeader, responseCommand.getBody()); + return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); } default:{ diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java index df14c225..ebae12b5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java @@ -8,13 +8,13 @@ import java.nio.ByteBuffer; public class RpcClientUtils { public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) { - RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getHeader().getCode(), rpcRequest.getHeader()); + RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getCode(), rpcRequest.getHeader()); cmd.setBody(encodeBody(rpcRequest.getBody())); return cmd; } public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) { - RemotingCommand cmd = RemotingCommand.createResponseCommandWithHeader(rpcResponse.getHeader().getCode(), rpcResponse.getHeader()); + RemotingCommand cmd = RemotingCommand.createResponseCommandWithHeader(rpcResponse.getCode(), rpcResponse.getHeader()); cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage()); cmd.setBody(encodeBody(rpcResponse.getBody())); return cmd; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java deleted file mode 100644 index 18b64d56..00000000 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.common.rpc; - -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -public class RpcHeader implements CommandCustomHeader { - - - protected int code; - - public RpcHeader() { - } - - public RpcHeader(int code) { - this.code = code; - } - - public int getCode() { - return code; - } - - public void setCode(int code) { - this.code = code; - } - - @Override - public void checkFields() throws RemotingCommandException { - - } -} diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java index 3ebe3fe2..90bb696e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java @@ -17,10 +17,12 @@ package org.apache.rocketmq.common.rpc; public class RpcRequest { + int code; private RpcRequestHeader header; private Object body; - public RpcRequest(RpcRequestHeader header, Object body) { + public RpcRequest(int code, RpcRequestHeader header, Object body) { + this.code = code; this.header = header; this.body = body; } @@ -32,4 +34,8 @@ public class RpcRequest { public Object getBody() { return body; } + + public int getCode() { + return code; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java index c5c748d5..577865ec 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java @@ -16,7 +16,9 @@ */ package org.apache.rocketmq.common.rpc; -public abstract class RpcRequestHeader extends RpcHeader { +import org.apache.rocketmq.remoting.CommandCustomHeader; + +public abstract class RpcRequestHeader implements CommandCustomHeader { //the namespace name protected String namespace; //if the data has been namespaced diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java index be7cf9b2..5fcde36a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java @@ -16,8 +16,11 @@ */ package org.apache.rocketmq.common.rpc; +import org.apache.rocketmq.remoting.CommandCustomHeader; + public class RpcResponse { - private RpcHeader header; + private int code; + private CommandCustomHeader header; private Object body; public RpcException exception; @@ -25,21 +28,26 @@ public class RpcResponse { } - public RpcResponse(RpcHeader header, byte[] body) { + public RpcResponse(int code, CommandCustomHeader header, byte[] body) { + this.code = code; this.header = header; this.body = body; } - public RpcResponse(RpcException rpcException) { - this.header = new RpcHeader(rpcException.getErrorCode()); + RpcResponse(RpcException rpcException) { + this.code = rpcException.getErrorCode(); this.exception = rpcException; } - public RpcHeader getHeader() { + public int getCode() { + return code; + } + + public CommandCustomHeader getHeader() { return header; } - public void setHeader(RpcHeader header) { + public void setHeader(CommandCustomHeader header) { this.header = header; } -- GitLab