From 4e9b097478e7de052a14cd883b47858e6a47b94e Mon Sep 17 00:00:00 2001 From: dongeforever Date: Fri, 19 Nov 2021 20:00:23 +0800 Subject: [PATCH] Check the correctness of logic items --- .../statictopic/LogicQueueMappingItem.java | 2 +- .../statictopic/TopicQueueMappingUtils.java | 67 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index 3ab69d7b..01686fde 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -7,7 +7,7 @@ public class LogicQueueMappingItem { private String bname; private long logicOffset; // the start of the logic offset private long startOffset; // the start of the physical offset, included - private long endOffset; // the end of the physical offset, excluded + private long endOffset = -1; // the end of the physical offset, excluded private long timeOfStart = -1; // mutable private long timeOfEnd = -1; // mutable diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 545b7cff..370f0ecb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -162,6 +162,72 @@ public class TopicQueueMappingUtils { return new AbstractMap.SimpleEntry(maxEpoch, maxNum); } + public static void checkLogicQueueMappingItemOffset(ImmutableList oldItems, ImmutableList newItems) { + if (oldItems == null || oldItems.isEmpty()) { + return; + } + if (newItems == null || newItems.isEmpty() || newItems.size() < oldItems.size()) { + throw new RuntimeException("The new item list is smaller than old ones"); + } + int iold = 0, inew = 0; + while (iold < oldItems.size() && inew < newItems.size()) { + LogicQueueMappingItem newItem = newItems.get(inew); + LogicQueueMappingItem oldItem = oldItems.get(iold); + if (newItem.getGen() < oldItem.getGen()) { + inew++; + continue; + } else if (oldItem.getGen() < newItem.getGen()){ + throw new RuntimeException("The gen is not correct for old item"); + } else { + assert oldItem.getBname().equals(newItem.getBname()); + assert oldItem.getQueueId() == newItem.getQueueId(); + assert oldItem.getStartOffset() == newItem.getStartOffset(); + if (oldItem.getLogicOffset() != -1) { + assert oldItem.getLogicOffset() == newItem.getLogicOffset(); + } + iold++; + inew++; + } + } + } + + + public static void checkLogicQueueMappingItemOffset(ImmutableList items) { + if (items == null + || items.isEmpty()) { + return; + } + int lastGen = -1; + long lastOffset = -1; + for (int i = items.size() - 1; i >=0 ; i--) { + LogicQueueMappingItem item = items.get(i); + if (item.getStartOffset() < 0 + || item.getGen() < 0 + || item.getQueueId() < 0) { + throw new RuntimeException("The field is illegal, should not be negative"); + } + if (lastGen != -1 && item.getGen() >= lastGen) { + throw new RuntimeException("The gen dose not increase monotonically"); + } + + if (item.getEndOffset() != -1 + && item.getEndOffset() < item.getStartOffset()) { + throw new RuntimeException("The endOffset is smaller than the start offset"); + } + + if (lastOffset != -1 && item.getLogicOffset() != -1) { + if (item.getLogicOffset() >= lastOffset) { + throw new RuntimeException("The base logic offset dose not increase monotonically"); + } + if (item.computeMaxStaticQueueOffset() >= lastOffset) { + throw new RuntimeException("The max logic offset dose not increase monotonically"); + } + } + lastGen = item.getGen(); + lastOffset = item.getLogicOffset(); + } + } + public static Map checkAndBuildMappingItems(List mappingDetailList, boolean replace, boolean checkConsistence) { Collections.sort(mappingDetailList, new Comparator() { @Override @@ -178,6 +244,7 @@ public class TopicQueueMappingUtils { } for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { Integer globalid = entry.getKey(); + checkLogicQueueMappingItemOffset(entry.getValue()); String leaderBrokerName = getLeaderBroker(entry.getValue()); if (!leaderBrokerName.equals(mappingDetail.getBname())) { //not the leader -- GitLab