From 82364fc28592980c8215edfaf4e20e254c39a861 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Thu, 18 Nov 2021 15:52:54 +0800 Subject: [PATCH] Polish the tools --- .../statictopic/TopicQueueMappingUtils.java | 16 ++++++- .../MigrateTopicLogicalQueueCommand.java | 2 +- .../topic/RemappingStaticTopicSubCommand.java | 24 +++------- .../topic/UpdateStaticTopicSubCommand.java | 47 +++++++++---------- 4 files changed, 45 insertions(+), 44 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 27dcc0e1..dfb6bbf2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -103,7 +103,7 @@ public class TopicQueueMappingUtils { return new AbstractMap.SimpleImmutableEntry(epoch, queueNum); } - public static Map buildMappingItems(List mappingDetailList, boolean replace) { + public static Map buildMappingItems(List mappingDetailList, boolean replace, boolean checkConsistence) { Collections.sort(mappingDetailList, new Comparator() { @Override public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) { @@ -111,8 +111,12 @@ public class TopicQueueMappingUtils { } }); + int maxNum = 0; Map globalIdMap = new HashMap(); for (TopicQueueMappingDetail mappingDetail : mappingDetailList) { + if (mappingDetail.totalQueues > maxNum) { + maxNum = mappingDetail.totalQueues; + } for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { Integer globalid = entry.getKey(); String leaderBrokerName = getLeaderBroker(entry.getValue()); @@ -129,6 +133,16 @@ public class TopicQueueMappingUtils { } } } + if (checkConsistence) { + if (maxNum != globalIdMap.size()) { + throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxNum, globalIdMap.size())); + } + for (int i = 0; i < maxNum; i++) { + if (!globalIdMap.containsKey(i)) { + throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i)); + } + } + } return globalIdMap; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java index 5da8b0d4..a3f212c8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java @@ -87,7 +87,7 @@ public class MigrateTopicLogicalQueueCommand implements SubCommand { String toBrokerName, Long forceDelta) throws RemotingException, MQBrokerException, InterruptedException, SubCommandException, MQClientException { TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); - LogicalQueuesInfo logicalQueuesInfo = topicRouteInfo.getLogicalQueuesInfo(); + LogicalQueuesInfo logicalQueuesInfo = null; /*topicRouteInfo.getLogicalQueuesInfo();*/ if (logicalQueuesInfo == null) { throw new SubCommandException("topic not enabled logical queue"); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java index edae005c..472ce0b4 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java @@ -172,26 +172,15 @@ public class RemappingStaticTopicSubCommand implements SubCommand { List detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList()); //check the epoch and qnum maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList); - final Map.Entry tmpMaxEpochAndNum = maxEpochAndNum; - detailList.forEach( mappingDetail -> { - if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) { - throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); + for (TopicQueueMappingDetail mappingDetail : detailList) { + if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) { + throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); } - if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) { - throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname())); - } - }); - - globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false); - - if (maxEpochAndNum.getValue() != globalIdMap.size()) { - throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size())); - } - for (int i = 0; i < maxEpochAndNum.getValue(); i++) { - if (!globalIdMap.containsKey(i)) { - throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i)); + if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) { + throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname())); } } + globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true); //the check is ok, now do the mapping allocation int maxNum = maxEpochAndNum.getValue(); @@ -262,6 +251,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { configMapping.getMappingDetail().setEpoch(epoch); configMapping.getMappingDetail().setTotalQueues(maxNum); }); + TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true); // now do the remapping //Step1: let the new leader can be write without the logicOffset for (String broker: brokersToMapIn) { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index d25fb70d..65c73772 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -64,7 +64,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand { Option opt = null; - opt = new Option("c", "clusters", true, "create topic to clusters, comma separated"); + opt = new Option("c", "clusters", true, "remapping static topic to clusters, comma separated"); + optionGroup.addOption(opt); + + opt = new Option("b", "brokers", true, "remapping static topic to brokers, comma separated"); optionGroup.addOption(opt); optionGroup.setRequired(true); @@ -94,22 +97,23 @@ public class UpdateStaticTopicSubCommand implements SubCommand { Set brokers = new HashSet<>(); try { - if (!commandLine.hasOption('t') - || !commandLine.hasOption('c') + if ((!commandLine.hasOption("b") && !commandLine.hasOption('c')) + || !commandLine.hasOption('t') || !commandLine.hasOption("qn")) { ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); return; } - String topic = commandLine.getOptionValue('t').trim(); - int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim()); - String clusters = commandLine.getOptionValue('c').trim(); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); if (clusterInfo == null || clusterInfo.getClusterAddrTable().isEmpty()) { throw new RuntimeException("The Cluster info is empty"); - } else { - clientMetadata.refreshClusterInfo(clusterInfo); } + clientMetadata.refreshClusterInfo(clusterInfo); + + String topic = commandLine.getOptionValue('t').trim(); + int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim()); + String clusters = commandLine.getOptionValue('c').trim(); for (String cluster : clusters.split(",")) { cluster = cluster.trim(); if (clusterInfo.getClusterAddrTable().get(cluster) != null) { @@ -163,26 +167,15 @@ public class UpdateStaticTopicSubCommand implements SubCommand { List detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList()); //check the epoch and qnum maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList); - final Map.Entry tmpMaxEpochAndNum = maxEpochAndNum; - detailList.forEach( mappingDetail -> { - if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) { - throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); - } - if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) { - throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname())); + for (TopicQueueMappingDetail mappingDetail : detailList) { + if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) { + throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); } - }); - - globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false); - - if (maxEpochAndNum.getValue() != globalIdMap.size()) { - throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size())); - } - for (int i = 0; i < maxEpochAndNum.getValue(); i++) { - if (!globalIdMap.containsKey(i)) { - throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i)); + if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) { + throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname())); } } + globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true); } if (queueNum < globalIdMap.size()) { throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size())); @@ -230,10 +223,14 @@ public class UpdateStaticTopicSubCommand implements SubCommand { LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); } + + //double check the topic config map existedTopicConfigMap.values().forEach( configMapping -> { configMapping.getMappingDetail().setEpoch(epoch); configMapping.getMappingDetail().setTotalQueues(queueNum); }); + TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true); + //If some succeed, and others fail, it will cause inconsistent data for (Map.Entry entry : existedTopicConfigMap.entrySet()) { String broker = entry.getKey(); -- GitLab