diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 9f525f6d67a2b03e9289c84e3687b5966c28bbdd..0a8f58cfaa9699595633a96389349b99b14b59d3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -646,8 +646,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements requestHeader.setPhysical(true); requestHeader.setTimestamp(timestamp); requestHeader.setQueueId(item.getQueueId()); - RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null); - RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().searchOffset(item.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); + requestHeader.setCode(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP); + requestHeader.setBname(item.getBname()); + RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); } @@ -751,8 +753,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); }; try { - RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); - RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getMinOffset(mappingItem.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); + requestHeader.setCode(RequestCode.GET_MIN_OFFSET); + requestHeader.setBname(mappingItem.getBname()); + requestHeader.setPhysical(true); + //TODO check if it is leader + RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); } @@ -802,8 +808,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); }; try { - RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); - RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getEarliestMsgStoretime(mappingItem.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); + requestHeader.setCode(RequestCode.GET_MIN_OFFSET); + requestHeader.setBname(mappingItem.getBname()); + requestHeader.setPhysical(true); + RpcRequest rpcRequest = new RpcRequest(requestHeader, null); + //TODO check if it is leader + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); }