diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java index 678b1f54a15d2048469a79028f33d90a8a0296cc..9056998cd8353f68d0821d35127c0648d6c9f613 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -16,16 +16,17 @@ */ package org.apache.rocketmq.broker.client.rebalance; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; + import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.common.message.MessageQueue; public class RebalanceLockManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 5f54889a93e67b1408b7b5e1c3b49ba52d1c6692..34f9aadb2c0c7946673adf5214c0f4c83c6b0f97 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -696,6 +696,28 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return response; } + private RemotingCommand rewriteRequestForStaticTopic(GetMaxOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { + if (mappingContext.getMappingDetail() == null) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); + if (mappingItem == null + || !mappingDetail.getBname().equals(mappingItem.getBname())) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); + } + long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId()); + + offset = mappingItem.computeStaticQueueOffset(offset); + + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); + final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); + responseHeader.setOffset(offset); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); @@ -703,33 +725,13 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements final GetMaxOffsetRequestHeader requestHeader = (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class); - String topic = requestHeader.getTopic(); - int queueId = requestHeader.getQueueId(); - - if (requestHeader.getLogicalQueue()) { - LogicalQueuesInfoInBroker logicalQueuesInfo = this.brokerController.getTopicConfigManager().selectLogicalQueuesInfo(topic); - if (logicalQueuesInfo != null) { - // max offset must be in the queue route with largest offset - LogicalQueueRouteData requestLogicalQueueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, Long.MAX_VALUE); - if (requestLogicalQueueRouteData != null) { - logicalQueuesInfo.readLock().lock(); - try { - List queueRouteDataList = logicalQueuesInfo.get(requestLogicalQueueRouteData.getLogicalQueueIndex()); - if (queueRouteDataList != null && !queueRouteDataList.isEmpty()) { - LogicalQueueRouteData selectedLogicalQueueRouteData = queueRouteDataList.get(queueRouteDataList.size() - 1); - if (!Objects.equals(selectedLogicalQueueRouteData.getMessageQueue(), new MessageQueue(topic, this.brokerController.getBrokerConfig().getBrokerName(), queueId))) { - log.info("getMaxOffset topic={} queueId={} not latest, redirect: {}", topic, queueId, selectedLogicalQueueRouteData); - response.addExtField(MessageConst.PROPERTY_REDIRECT, "1"); - } - } - } finally { - logicalQueuesInfo.readLock().unlock(); - } - } - } + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader); + RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); + if (rewriteResult != null) { + return rewriteResult; } - long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId, requestHeader.isCommitted()); + long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); responseHeader.setOffset(offset); @@ -738,6 +740,36 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return response; } + private RemotingCommand rewriteRequestForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { + if (mappingContext.getMappingDetail() == null) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); + if (mappingItem == null + || !mappingDetail.getBname().equals(mappingItem.getBname())) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); + }; + try { + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getMinOffset(mappingItem.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader(); + long offset = mappingItem.computeStaticQueueOffset(offsetResponseHeader.getOffset()); + + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); + final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); + responseHeader.setOffset(offset); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + }catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + } + private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); @@ -745,6 +777,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements final GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L); + RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); + if (rewriteResult != null) { + return rewriteResult; + } + long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); responseHeader.setOffset(offset); @@ -753,6 +791,35 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return response; } + private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { + if (mappingContext.getMappingDetail() == null) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); + if (mappingItem == null + || !mappingDetail.getBname().equals(mappingItem.getBname())) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); + }; + try { + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getEarliestMsgStoretime(mappingItem.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + GetEarliestMsgStoretimeResponseHeader offsetResponseHeader = (GetEarliestMsgStoretimeResponseHeader) rpcResponse.getHeader(); + + final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class); + final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader(); + responseHeader.setTimestamp(offsetResponseHeader.getTimestamp()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + }catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + } + private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class); @@ -760,6 +827,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements final GetEarliestMsgStoretimeRequestHeader requestHeader = (GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L); + RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); + if (rewriteResult != null) { + return rewriteResult; + } + long timestamp = this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 67fd9372dea869ba0e1b437d59d9955738d8c123..2d9f17dbbe57e35ea19d14a3b24a1ab62df89dc9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1126,7 +1126,6 @@ public class MQClientAPIImpl { requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); requestHeader.setCommitted(committed); - requestHeader.setLogicalQueue(fromLogicalQueue); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java index c64381fb7870cdb450af2cb1c7f7b6f170c6eba7..fea17362efd2de4dd208c967f5d2662d35faafec 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader { +public class GetEarliestMsgStoretimeRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String topic; @CFNotNull @@ -34,18 +34,22 @@ public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader public void checkFields() throws RemotingCommandException { } + @Override public String getTopic() { return topic; } + @Override public void setTopic(String topic) { this.topic = topic; } + @Override public Integer getQueueId() { return queueId; } + @Override public void setQueueId(Integer queueId) { this.queueId = queueId; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java index 6963195f9121450d376056d029252b0811fece86..e4226c20ece96e1d3e288bf4437f31f9327296d2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java @@ -20,34 +20,37 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetMaxOffsetRequestHeader implements CommandCustomHeader { +public class GetMaxOffsetRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String topic; @CFNotNull private Integer queueId; private boolean committed; - private boolean logicalQueue; @Override public void checkFields() throws RemotingCommandException { } + @Override public String getTopic() { return topic; } + @Override public void setTopic(String topic) { this.topic = topic; } + @Override public Integer getQueueId() { return queueId; } + @Override public void setQueueId(Integer queueId) { this.queueId = queueId; } @@ -59,12 +62,4 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader { public boolean isCommitted() { return committed; } - - public void setLogicalQueue(boolean logicalQueue) { - this.logicalQueue = logicalQueue; - } - - public boolean getLogicalQueue() { - return logicalQueue; - } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java index 6fb8ed40c459ff801e0eb49c0e0d86f160ee1a41..6889ae8fda584df4869d2068ef70dff9e2bc985e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java @@ -20,11 +20,11 @@ */ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetMinOffsetRequestHeader implements CommandCustomHeader { +public class GetMinOffsetRequestHeader extends TopicQueueRequestHeader { @CFNotNull private String topic; @CFNotNull @@ -34,18 +34,22 @@ public class GetMinOffsetRequestHeader implements CommandCustomHeader { public void checkFields() throws RemotingCommandException { } + @Override public String getTopic() { return topic; } + @Override public void setTopic(String topic) { this.topic = topic; } + @Override public Integer getQueueId() { return queueId; } + @Override public void setQueueId(Integer queueId) { this.queueId = queueId; }