提交 8b747f97 编写于 作者: D dongeforever

Try using the new style to handble get min offset

上级 1d7807bb
......@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.CompleteFuture;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.BrokerController;
......@@ -103,6 +104,8 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.rpc.RpcClientUtils;
import org.apache.rocketmq.common.rpc.RpcException;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
......@@ -146,7 +149,10 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
......@@ -764,15 +770,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
private RemotingCommand rewriteRequestForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
if (mappingContext.getMappingDetail() == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
if (!mappingContext.isLeader()) {
//this may not
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d is not leader in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()))));
};
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
......@@ -796,38 +802,40 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
}
long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset);
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader();
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null));
} catch (Throwable t) {
log.error("rewriteRequestForStaticTopic failed", t);
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t)));
}
}
private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
private CompletableFuture<RpcResponse> handleGetMinOffset(RpcRequest request) {
assert request.getCode() == RequestCode.GET_MIN_OFFSET;
GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
final GetMinOffsetResponseHeader responseHeader = new GetMinOffsetResponseHeader();
long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
return CompletableFuture.completedFuture(new RpcResponse(ResponseCode.SUCCESS, responseHeader, null));
}
private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
try {
CompletableFuture<RpcResponse> responseFuture = handleGetMinOffset(new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null));
RpcResponse rpcResponse = responseFuture.get();
return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
......
......@@ -34,7 +34,7 @@ public class RpcResponse {
this.body = body;
}
RpcResponse(RpcException rpcException) {
public RpcResponse(RpcException rpcException) {
this.code = rpcException.getErrorCode();
this.exception = rpcException;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册