diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 270c953aebd3f886007ecf621774ddb39d91b20b..cca96f1785ebf75837a79c63c26e78adffb5c1b6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.CompleteFuture; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.broker.BrokerController; @@ -103,6 +104,8 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.rpc.RpcClientUtils; +import org.apache.rocketmq.common.rpc.RpcException; import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; @@ -146,7 +149,10 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; @@ -764,15 +770,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return response; } - private RemotingCommand rewriteRequestForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { + private CompletableFuture handleGetMinOffsetForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { if (mappingContext.getMappingDetail() == null) { return null; } - TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); if (!mappingContext.isLeader()) { //this may not - return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); + return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE, + String.format("%s-%d is not leader in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())))); }; LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); @@ -796,38 +802,40 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements } long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset); - final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); - final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); + final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader(); responseHeader.setOffset(offset); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; + return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null)); } catch (Throwable t) { log.error("rewriteRequestForStaticTopic failed", t); - return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t))); } } - private RemotingCommand getMinOffset(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); - final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); - final GetMinOffsetRequestHeader requestHeader = - (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); - - + private CompletableFuture handleGetMinOffset(RpcRequest request) { + assert request.getCode() == RequestCode.GET_MIN_OFFSET; + GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader(); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false); - RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); + CompletableFuture rewriteResult = handleGetMinOffsetForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; } - + final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader(); long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); - responseHeader.setOffset(offset); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; + return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null)); + } + + private RemotingCommand getMinOffset(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final GetMinOffsetRequestHeader requestHeader = + (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); + try { + CompletableFuture responseFuture = handleGetMinOffset(new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null)); + RpcResponse rpcResponse = responseFuture.get(); + return RpcClientUtils.createCommandForRpcResponse(rpcResponse); + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } } private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java index 2f61329c036aec36e4a301e75c326cba28d40304..5155bd241ce3da81f133991851aeae1408d6ef7f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java @@ -34,7 +34,7 @@ public class RpcResponse { this.body = body; } - RpcResponse(RpcException rpcException) { + public RpcResponse(RpcException rpcException) { this.code = rpcException.getErrorCode(); this.exception = rpcException; }