From 5ce093e13291b699192825181212d0a438a61e2f Mon Sep 17 00:00:00 2001 From: dongeforever Date: Thu, 25 Nov 2021 19:24:35 +0800 Subject: [PATCH] Add test for remapping static topic --- .../statictopic/TopicQueueMappingOne.java | 28 ++++++++ .../statictopic/TopicQueueMappingUtils.java | 51 ++++++++++---- .../statictopic/TopicMappingUtilsTest.java | 70 ++++++++++++++++--- .../rocketmq/test/smoke/StaticTopicIT.java | 2 +- .../topic/RemappingStaticTopicSubCommand.java | 16 +---- .../topic/UpdateStaticTopicSubCommand.java | 12 +--- 6 files changed, 129 insertions(+), 50 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java index 801cd67e..319e113d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.common.statictopic; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import java.util.List; @@ -49,4 +51,30 @@ public class TopicQueueMappingOne extends RemotingSerializable { public List getItems() { return items; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + + if (!(o instanceof TopicQueueMappingOne)) return false; + + TopicQueueMappingOne that = (TopicQueueMappingOne) o; + + return new EqualsBuilder() + .append(topic, that.topic) + .append(bname, that.bname) + .append(globalId, that.globalId) + .append(items, that.items) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(topic) + .append(bname) + .append(globalId) + .append(items) + .toHashCode(); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index d1d81c68..ea6bc611 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.common.statictopic; -import com.google.common.collect.ImmutableList; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -122,7 +121,7 @@ public class TopicQueueMappingUtils { return detailList; } - public static Map.Entry checkConsistenceOfTopicConfigAndQueueMapping(String topic, Map brokerConfigMap) { + public static Map.Entry checkNameEpochNumConsistence(String topic, Map brokerConfigMap) { if (brokerConfigMap == null || brokerConfigMap.isEmpty()) { return null; @@ -212,6 +211,11 @@ public class TopicQueueMappingUtils { || item.getQueueId() < 0) { throw new RuntimeException("The field is illegal, should not be negative"); } + if (items.size() >= 2 + && i <= items.size() - 2 + && items.get(i).getLogicOffset() < 0) { + throw new RuntimeException("The non-latest item has negative logic offset"); + } if (lastGen != -1 && item.getGen() >= lastGen) { throw new RuntimeException("The gen dose not increase monotonically"); } @@ -249,6 +253,14 @@ public class TopicQueueMappingUtils { } } + public static void checkLeaderInTargetBrokers(Collection mappingOnes, Set targetBrokers) { + for (TopicQueueMappingOne mappingOne : mappingOnes) { + if (!targetBrokers.contains(mappingOne.bname)) { + throw new RuntimeException("The leader broker does not in target broker"); + } + } + } + public static void checkPhysicalQueueConsistence(Map brokerConfigMap) { for (Map.Entry entry : brokerConfigMap.entrySet()) { TopicConfigAndQueueMapping configMapping = entry.getValue(); @@ -264,7 +276,7 @@ public class TopicQueueMappingUtils { } TopicConfig topicConfig = brokerConfigMap.get(item.getBname()); if (topicConfig == null) { - throw new RuntimeException("The broker dose not exist"); + throw new RuntimeException("The broker of item dose not exist"); } if (item.getQueueId() >= topicConfig.getWriteQueueNums()) { throw new RuntimeException("The physical queue id is overflow the write queues"); @@ -274,6 +286,8 @@ public class TopicQueueMappingUtils { } } + + public static Map checkAndBuildMappingItems(List mappingDetailList, boolean replace, boolean checkConsistence) { Collections.sort(mappingDetailList, new Comparator() { @Override @@ -366,7 +380,7 @@ public class TopicQueueMappingUtils { Map globalIdMap = new HashMap(); Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry(System.currentTimeMillis(), queueNum); if (!brokerConfigMap.isEmpty()) { - maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); checkIfReusePhysicalQueue(globalIdMap.values()); checkPhysicalQueueConsistence(brokerConfigMap); @@ -430,8 +444,8 @@ public class TopicQueueMappingUtils { } //double check the config { - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); - globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); + globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), false, true); checkIfReusePhysicalQueue(globalIdMap.values()); checkPhysicalQueueConsistence(brokerConfigMap); } @@ -440,8 +454,11 @@ public class TopicQueueMappingUtils { public static TopicRemappingDetailWrapper remappingStaticTopic(String topic, Map brokerConfigMap, Set targetBrokers) { - final Map.Entry maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); - final Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + Map.Entry maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), false, true); + TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap); + TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values()); + //the check is ok, now do the mapping allocation int maxNum = maxEpochAndNum.getValue(); @@ -449,7 +466,6 @@ public class TopicQueueMappingUtils { for (String broker: targetBrokers) { brokerNumMap.put(broker, 0); } - TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap(), brokerNumMap); allocator.upToNum(maxNum); Map expectedBrokerNumMap = allocator.getBrokerNumMap(); @@ -504,12 +520,17 @@ public class TopicQueueMappingUtils { TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker); TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker); + if (mapInConfig == null) { + mapInConfig = new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, maxNum, mapInBroker, newEpoch)); + brokerConfigMap.put(mapInBroker, mapInConfig); + } + mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1); - mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1); + mapInConfig.setReadQueueNums(mapInConfig.getReadQueueNums() + 1); List items = new ArrayList(topicQueueMappingOne.getItems()); LogicQueueMappingItem last = items.get(items.size() - 1); - items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, 0, 0, -1, -1, -1)); + items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1)); //Use the same object TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, items); @@ -524,10 +545,12 @@ public class TopicQueueMappingUtils { //double check { - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); + globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), false, true); + TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap); + TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values()); + TopicQueueMappingUtils.checkLeaderInTargetBrokers(globalIdMap.values(), targetBrokers); } - - return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut); } diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java index 14e4c0f2..1b0ad54c 100644 --- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java @@ -15,11 +15,14 @@ import java.util.Set; public class TopicMappingUtilsTest { - private Set buildTargetBrokers(int num) { + return buildTargetBrokers(num, ""); + } + + private Set buildTargetBrokers(int num, String suffix) { Set brokers = new HashSet(); for (int i = 0; i < num; i++) { - brokers.add("broker" + i); + brokers.add("broker" + suffix + i); } return brokers; } @@ -102,7 +105,7 @@ public class TopicMappingUtilsTest { Assert.assertEquals(2 * i, brokerConfigMap.size()); //do the check manually - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values()); TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap); @@ -125,7 +128,52 @@ public class TopicMappingUtilsTest { } @Test - public void testCreateStaticTopic_Error() { + public void testRemappingStaticTopic() { + String topic = "static"; + int queueNum = 7; + Map brokerConfigMap = new HashMap(); + Set originalBrokers = buildTargetBrokers(2); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, originalBrokers, brokerConfigMap); + Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); + Assert.assertEquals(2, brokerConfigMap.size()); + + { + //do the check manually + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); + TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values()); + } + + for (int i = 0; i < 10; i++) { + Set targetBrokers = buildTargetBrokers(2, "test" + i); + TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers); + //do the check manually + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); + TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values()); + TopicQueueMappingUtils.checkLeaderInTargetBrokers(globalIdMap.values(), targetBrokers); + + Assert.assertEquals((i + 2) * 2, brokerConfigMap.size()); + + //check and complete the logicOffset + for (Map.Entry entry : brokerConfigMap.entrySet()) { + TopicConfigAndQueueMapping configMapping = entry.getValue(); + if (!targetBrokers.contains(configMapping.getMappingDetail().bname)) { + continue; + } + for (List items: configMapping.getMappingDetail().getHostedQueues().values()) { + Assert.assertEquals(i + 2, items.size()); + items.get(items.size() - 1).setLogicOffset(i + 1); + } + } + } + } + + + @Test + public void testUtilsCheck() { String topic = "static"; int queueNum = 10; Map brokerConfigMap = new HashMap(); @@ -135,33 +183,33 @@ public class TopicMappingUtilsTest { Assert.assertEquals(2, brokerConfigMap.size()); TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next(); List items = configMapping.getMappingDetail().getHostedQueues().values().iterator().next(); - Map.Entry maxEpochNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + Map.Entry maxEpochNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); int exceptionNum = 0; try { configMapping.getMappingDetail().setTopic("xxxx"); - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); } catch (RuntimeException ignore) { exceptionNum++; configMapping.getMappingDetail().setTopic(topic); - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); } try { configMapping.getMappingDetail().setTotalQueues(1); - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); } catch (RuntimeException ignore) { exceptionNum++; configMapping.getMappingDetail().setTotalQueues(10); - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); } try { configMapping.getMappingDetail().setEpoch(0); - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); } catch (RuntimeException ignore) { exceptionNum++; configMapping.getMappingDetail().setEpoch(maxEpochNum.getKey()); - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); } diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index 9d3e0c38..a5656f31 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -72,7 +72,7 @@ public class StaticTopicIT extends BaseConf { Assert.assertNotNull(localConfigMapping); Assert.assertEquals(configMapping, localConfigMapping); } - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, remoteBrokerConfigMap); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); Assert.assertEquals(queueNum, globalIdMap.size()); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java index 019fad5b..eac75188 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java @@ -16,22 +16,15 @@ */ package org.apache.rocketmq.tools.command.topic; -import com.google.common.collect.ImmutableList; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; -import org.apache.rocketmq.common.admin.TopicOffset; -import org.apache.rocketmq.common.admin.TopicStatsTable; -import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.route.QueueData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.remoting.RPCHook; @@ -40,16 +33,11 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; -import java.util.AbstractMap; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; public class RemappingStaticTopicSubCommand implements SubCommand { @@ -108,7 +96,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { String mapData = MixAll.file2String(mapFileName); TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class); //double check the config - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap()); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, wrapper.getBrokerConfigMap()); TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); @@ -202,7 +190,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { if (brokerConfigMap.isEmpty()) { throw new RuntimeException("No topic route to do the remapping"); } - Map.Entry maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + Map.Entry maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); { TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet(), new HashSet()); String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 5a4fc043..85e2cc58 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -16,21 +16,14 @@ */ package org.apache.rocketmq.tools.command.topic; -import com.google.common.collect.ImmutableList; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; -import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; -import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; -import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.route.QueueData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.remoting.RPCHook; @@ -45,7 +38,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; public class UpdateStaticTopicSubCommand implements SubCommand { @@ -107,7 +99,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { String mapData = MixAll.file2String(mapFileName); TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class); //double check the config - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap()); + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, wrapper.getBrokerConfigMap()); boolean force = false; if (commandLine.hasOption("fr") && Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) { force = true; @@ -215,7 +207,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum); if (!brokerConfigMap.isEmpty()) { - maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + maxEpochAndNum = TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, brokerConfigMap); } { -- GitLab