From 4f96f72c1c24b557ec5998a6985a353f64ec5f6c Mon Sep 17 00:00:00 2001 From: dongeforever Date: Fri, 3 Dec 2021 15:12:11 +0800 Subject: [PATCH] Fix the max offset logic for logicOffset = -1 --- .../processor/AdminBrokerProcessor.java | 53 +++++++++++++------ .../processor/PullMessageProcessor.java | 6 +-- .../processor/SendMessageProcessor.java | 2 +- .../rocketmq/common/rpc/RpcClientImpl.java | 25 +++++++++ .../statictopic/LogicQueueMappingItem.java | 19 +++---- .../statictopic/TopicQueueMappingUtils.java | 3 -- .../rocketmq/tools/admin/MQAdminUtils.java | 3 +- 7 files changed, 78 insertions(+), 33 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 33d3025a..a286818c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -633,12 +633,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements long offset = -1; for (int i = 0; i < mappingItems.size(); i++) { LogicQueueMappingItem item = mappingItems.get(i); + if (!item.checkIfLogicoffsetDecided()) { + continue; + } if (mappingDetail.getBname().equals(item.getBname())) { - //means the leader - assert i == mappingItems.size() - 1; offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp); if (offset > 0) { - offset = item.computeStaticQueueOffsetUpToEnd(offset); + offset = item.computeStaticQueueOffsetStrictly(offset); + break; } } else { requestHeader.setPhysical(true); @@ -650,12 +652,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (rpcResponse.getException() != null) { throw rpcResponse.getException(); } - SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader)rpcResponse.getHeader(); + SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader) rpcResponse.getHeader(); if (offsetResponseHeader.getOffset() < 0 || (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) { continue; } else { - offset = item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset()); + offset = item.computeStaticQueueOffsetStrictly(offsetResponseHeader.getOffset()); } } @@ -705,17 +707,38 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (!mappingContext.isLeader()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } - long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId()); - offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset); + try { + LogicQueueMappingItem maxItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), Long.MAX_VALUE, true); + assert maxItem != null; + assert maxItem.getLogicOffset() >= 0; + requestHeader.setBname(maxItem.getBname()); + requestHeader.setPhysical(true); + requestHeader.setQueueId(mappingItem.getQueueId()); + long maxPhysicalOffset = Long.MAX_VALUE; + if (maxItem.getBname().equals(mappingDetail.getBname())) { + //current broker + maxPhysicalOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId()); + } else { + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MAX_OFFSET, requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + GetMaxOffsetResponseHeader offsetResponseHeader = (GetMaxOffsetResponseHeader) rpcResponse.getHeader(); + maxPhysicalOffset = offsetResponseHeader.getOffset(); + } - final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); - final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); - responseHeader.setOffset(offset); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); + final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); + responseHeader.setOffset(maxItem.computeStaticQueueOffsetStrictly(maxPhysicalOffset)); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } } private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, @@ -770,7 +793,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader(); physicalOffset = offsetResponseHeader.getOffset(); } - long offset = mappingItem.computeStaticQueueOffsetUpToEnd(physicalOffset); + long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); @@ -820,7 +843,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements try { requestHeader.setBname(mappingItem.getBname()); requestHeader.setPhysical(true); - RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null); //TODO check if it is in current broker RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 6bac707c..9b8135e7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -274,11 +274,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements && nextBeginOffset >= currentItem.getEndOffset()) { nextBeginOffset = currentItem.getEndOffset(); } - responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset)); + responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetStrictly(nextBeginOffset)); //handle min offset - responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetUpToEnd(Math.max(currentItem.getStartOffset(), minOffset))); + responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetStrictly(Math.max(currentItem.getStartOffset(), minOffset))); //handle max offset - responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetUpToEnd(maxOffset), + responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetStrictly(maxOffset), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId()))); //set the offsetDelta responseHeader.setOffsetDelta(currentItem.computeOffsetDelta()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index c229ce36..bdcba39b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } //no need to care the broker name - long staticLogicOffset = mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset()); + long staticLogicOffset = mappingItem.computeStaticQueueOffsetLoosely(responseHeader.getQueueOffset()); if (staticLogicOffset < 0) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 713bbf94..47ffcc2b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -7,6 +7,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; @@ -70,6 +71,9 @@ public class RpcClientImpl implements RpcClient { case RequestCode.GET_MIN_OFFSET: rpcResponsePromise = handleGetMinOffset(addr, request, timeoutMs); break; + case RequestCode.GET_MAX_OFFSET: + rpcResponsePromise = handleGetMaxOffset(addr, request, timeoutMs); + break; case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP: rpcResponsePromise = handleSearchOffset(addr, request, timeoutMs); break; @@ -226,6 +230,27 @@ public class RpcClientImpl implements RpcClient { return rpcResponsePromise; } + public Promise handleGetMaxOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + final Promise rpcResponsePromise = createResponseFuture(); + + RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); + + RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); + assert responseCommand != null; + switch (responseCommand.getCode()) { + case ResponseCode.SUCCESS: { + GetMaxOffsetResponseHeader responseHeader = + (GetMaxOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class); + rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); + break; + } + default:{ + rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); + } + } + return rpcResponsePromise; + } + public Promise handleGetEarliestMsgStoretime(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { final Promise rpcResponsePromise = createResponseFuture(); diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index 9f79e9d1..76e74067 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -31,7 +31,9 @@ public class LogicQueueMappingItem extends RemotingSerializable { this.timeOfEnd = timeOfEnd; } - public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) { + + //should only be user in sendMessage and getMinOffset + public long computeStaticQueueOffsetLoosely(long physicalQueueOffset) { //consider the newly mapped item if (logicOffset < 0) { return -1; @@ -46,10 +48,9 @@ public class LogicQueueMappingItem extends RemotingSerializable { return logicOffset + (physicalQueueOffset - startOffset); } - public long computeStaticQueueOffset(long physicalQueueOffset) { - if (logicOffset < 0) { - return logicOffset; - } + public long computeStaticQueueOffsetStrictly(long physicalQueueOffset) { + assert logicOffset >= 0; + if (physicalQueueOffset < startOffset) { return logicOffset; } @@ -67,15 +68,15 @@ public class LogicQueueMappingItem extends RemotingSerializable { return logicOffset; } } - public boolean checkIfShouldDeleted() { - return endOffset == startOffset; - } - public boolean checkIfEndOffsetDecided() { //if the endOffset == startOffset, then the item should be deleted return endOffset > startOffset; } + public boolean checkIfLogicoffsetDecided() { + return logicOffset >= 0; + } + public long computeOffsetDelta() { return logicOffset - startOffset; } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index e56d5853..8aa15748 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -632,9 +632,6 @@ public class TopicQueueMappingUtils { if (ignoreNegative && item.getLogicOffset() < 0) { continue; } - if (!item.checkIfShouldDeleted()) { - return mappingItems.get(i); - } } return null; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java index 288bac46..eed6b767 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java @@ -16,7 +16,6 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; -import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; @@ -165,7 +164,7 @@ public class MQAdminUtils { if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); } - newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize)); + newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffsetStrictly(topicOffset.getMaxOffset()), blockSeqSize)); TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname()); //fresh the new leader TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items); -- GitLab