diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 33d3025a3010b040f408eebdbf8a2024dcaa8d7f..a286818c7f5b03fd39aeb86168023f775a3b5f09 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -633,12 +633,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements long offset = -1; for (int i = 0; i < mappingItems.size(); i++) { LogicQueueMappingItem item = mappingItems.get(i); + if (!item.checkIfLogicoffsetDecided()) { + continue; + } if (mappingDetail.getBname().equals(item.getBname())) { - //means the leader - assert i == mappingItems.size() - 1; offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp); if (offset > 0) { - offset = item.computeStaticQueueOffsetUpToEnd(offset); + offset = item.computeStaticQueueOffsetStrictly(offset); + break; } } else { requestHeader.setPhysical(true); @@ -650,12 +652,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (rpcResponse.getException() != null) { throw rpcResponse.getException(); } - SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader)rpcResponse.getHeader(); + SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader) rpcResponse.getHeader(); if (offsetResponseHeader.getOffset() < 0 || (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) { continue; } else { - offset = item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset()); + offset = item.computeStaticQueueOffsetStrictly(offsetResponseHeader.getOffset()); } } @@ -705,17 +707,38 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (!mappingContext.isLeader()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } - long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId()); - offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset); + try { + LogicQueueMappingItem maxItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), Long.MAX_VALUE, true); + assert maxItem != null; + assert maxItem.getLogicOffset() >= 0; + requestHeader.setBname(maxItem.getBname()); + requestHeader.setPhysical(true); + requestHeader.setQueueId(mappingItem.getQueueId()); + long maxPhysicalOffset = Long.MAX_VALUE; + if (maxItem.getBname().equals(mappingDetail.getBname())) { + //current broker + maxPhysicalOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId()); + } else { + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MAX_OFFSET, requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + GetMaxOffsetResponseHeader offsetResponseHeader = (GetMaxOffsetResponseHeader) rpcResponse.getHeader(); + maxPhysicalOffset = offsetResponseHeader.getOffset(); + } - final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); - final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); - responseHeader.setOffset(offset); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); + final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); + responseHeader.setOffset(maxItem.computeStaticQueueOffsetStrictly(maxPhysicalOffset)); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } } private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, @@ -770,7 +793,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader(); physicalOffset = offsetResponseHeader.getOffset(); } - long offset = mappingItem.computeStaticQueueOffsetUpToEnd(physicalOffset); + long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); @@ -820,7 +843,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements try { requestHeader.setBname(mappingItem.getBname()); requestHeader.setPhysical(true); - RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null); //TODO check if it is in current broker RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 6bac707c80cd334379e46aa6ce8c3e93a129e668..9b8135e7aad44910e06e7abfee238112cc9c257c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -274,11 +274,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements && nextBeginOffset >= currentItem.getEndOffset()) { nextBeginOffset = currentItem.getEndOffset(); } - responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset)); + responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetStrictly(nextBeginOffset)); //handle min offset - responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetUpToEnd(Math.max(currentItem.getStartOffset(), minOffset))); + responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetStrictly(Math.max(currentItem.getStartOffset(), minOffset))); //handle max offset - responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetUpToEnd(maxOffset), + responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetStrictly(maxOffset), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId()))); //set the offsetDelta responseHeader.setOffsetDelta(currentItem.computeOffsetDelta()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index c229ce36ec105e70eab22fa3b7c2fd751c93c760..bdcba39bdc55c55399640783720a397ac99cffe9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } //no need to care the broker name - long staticLogicOffset = mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset()); + long staticLogicOffset = mappingItem.computeStaticQueueOffsetLoosely(responseHeader.getQueueOffset()); if (staticLogicOffset < 0) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 713bbf940e50a9e4418381c815a21952ccb719d5..47ffcc2be5c6e8c47c2c448f008125ed7c306675 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -7,6 +7,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; @@ -70,6 +71,9 @@ public class RpcClientImpl implements RpcClient { case RequestCode.GET_MIN_OFFSET: rpcResponsePromise = handleGetMinOffset(addr, request, timeoutMs); break; + case RequestCode.GET_MAX_OFFSET: + rpcResponsePromise = handleGetMaxOffset(addr, request, timeoutMs); + break; case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP: rpcResponsePromise = handleSearchOffset(addr, request, timeoutMs); break; @@ -226,6 +230,27 @@ public class RpcClientImpl implements RpcClient { return rpcResponsePromise; } + public Promise handleGetMaxOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + final Promise rpcResponsePromise = createResponseFuture(); + + RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); + + RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); + assert responseCommand != null; + switch (responseCommand.getCode()) { + case ResponseCode.SUCCESS: { + GetMaxOffsetResponseHeader responseHeader = + (GetMaxOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class); + rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); + break; + } + default:{ + rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); + } + } + return rpcResponsePromise; + } + public Promise handleGetEarliestMsgStoretime(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { final Promise rpcResponsePromise = createResponseFuture(); diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index 9f79e9d12d8a19d668e8ba6d17ba6711573e96be..76e74067a6c32b83b74eb4b436bbb2d0d7ad0273 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -31,7 +31,9 @@ public class LogicQueueMappingItem extends RemotingSerializable { this.timeOfEnd = timeOfEnd; } - public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) { + + //should only be user in sendMessage and getMinOffset + public long computeStaticQueueOffsetLoosely(long physicalQueueOffset) { //consider the newly mapped item if (logicOffset < 0) { return -1; @@ -46,10 +48,9 @@ public class LogicQueueMappingItem extends RemotingSerializable { return logicOffset + (physicalQueueOffset - startOffset); } - public long computeStaticQueueOffset(long physicalQueueOffset) { - if (logicOffset < 0) { - return logicOffset; - } + public long computeStaticQueueOffsetStrictly(long physicalQueueOffset) { + assert logicOffset >= 0; + if (physicalQueueOffset < startOffset) { return logicOffset; } @@ -67,15 +68,15 @@ public class LogicQueueMappingItem extends RemotingSerializable { return logicOffset; } } - public boolean checkIfShouldDeleted() { - return endOffset == startOffset; - } - public boolean checkIfEndOffsetDecided() { //if the endOffset == startOffset, then the item should be deleted return endOffset > startOffset; } + public boolean checkIfLogicoffsetDecided() { + return logicOffset >= 0; + } + public long computeOffsetDelta() { return logicOffset - startOffset; } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index e56d585371ad26d1f20c25cd3fe019f57b8053ce..8aa157488682926e8039c627c151d45a4d4f18f5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -632,9 +632,6 @@ public class TopicQueueMappingUtils { if (ignoreNegative && item.getLogicOffset() < 0) { continue; } - if (!item.checkIfShouldDeleted()) { - return mappingItems.get(i); - } } return null; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java index 288bac466ffd7a8e5180692d295c90ad659b2cfa..eed6b767c25221615152ba2cf1cbd8472c2718a9 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java @@ -16,7 +16,6 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; -import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; @@ -165,7 +164,7 @@ public class MQAdminUtils { if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); } - newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize)); + newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffsetStrictly(topicOffset.getMaxOffset()), blockSeqSize)); TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname()); //fresh the new leader TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);