From a5af2cf519e7c2c2a306cc326926100fc75f57d3 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 17 Nov 2021 23:05:03 +0800 Subject: [PATCH] Finish the UpdateStaticTopicSubCommand --- .../apache/rocketmq/common/TopicConfig.java | 6 + .../common/TopicConfigAndQueueMapping.java | 11 +- .../common/TopicQueueMappingInfo.java | 11 +- .../common/TopicQueueMappingUtils.java | 59 ++++-- .../topic/UpdateStaticTopicSubCommand.java | 192 +++++++++++------- 5 files changed, 181 insertions(+), 98 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java index c082ba66..ec4d54bc 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java @@ -47,6 +47,12 @@ public class TopicConfig { this.order = other.order; } + public TopicConfig(String topicName, int readQueueNums, int writeQueueNums) { + this.topicName = topicName; + this.readQueueNums = readQueueNums; + this.writeQueueNums = writeQueueNums; + } + public TopicConfig(String topicName, int readQueueNums, int writeQueueNums, int perm) { this.topicName = topicName; this.readQueueNums = readQueueNums; diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java index 3bc7f24d..e2eece67 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java @@ -17,18 +17,17 @@ package org.apache.rocketmq.common; public class TopicConfigAndQueueMapping extends TopicConfig { - private TopicQueueMappingDetail topicQueueMappingDetail; + private TopicQueueMappingDetail mappingDetail; public TopicConfigAndQueueMapping() { } - public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingDetail topicQueueMappingDetail) { + public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail) { super(topicConfig); - this.topicQueueMappingDetail = topicQueueMappingDetail; + this.mappingDetail = mappingDetail; } - - public TopicQueueMappingDetail getTopicQueueMappingInfo() { - return topicQueueMappingDetail; + public TopicQueueMappingDetail getMappingDetail() { + return mappingDetail; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java index b2e85911..b4bf7760 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java @@ -55,9 +55,6 @@ public class TopicQueueMappingInfo extends RemotingSerializable { return totalQueues; } - public void setTotalQueues(int totalQueues) { - this.totalQueues = totalQueues; - } public String getBname() { return bname; @@ -71,6 +68,14 @@ public class TopicQueueMappingInfo extends RemotingSerializable { return epoch; } + public void setEpoch(int epoch) { + this.epoch = epoch; + } + + public void setTotalQueues(int totalQueues) { + this.totalQueues = totalQueues; + } + public ConcurrentMap getCurrIdMap() { return currIdMap; } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java index 93345df5..87c54a33 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java @@ -29,16 +29,18 @@ import java.util.Random; public class TopicQueueMappingUtils { - public static class MappingState { + public static class MappingAllocator { Map brokerNumMap = new HashMap(); + Map idToBroker = new HashMap(); int currentIndex = 0; Random random = new Random(); List leastBrokers = new ArrayList(); - private MappingState(Map brokerNumMap) { + private MappingAllocator(Map idToBroker, Map brokerNumMap) { + this.idToBroker.putAll(idToBroker); this.brokerNumMap.putAll(brokerNumMap); } - public void freshState() { + private void freshState() { int minNum = -1; for (Map.Entry entry : brokerNumMap.entrySet()) { if (entry.getValue() > minNum) { @@ -50,20 +52,41 @@ public class TopicQueueMappingUtils { } currentIndex = random.nextInt(leastBrokers.size()); } - - public String nextBroker() { + private String nextBroker() { if (leastBrokers.isEmpty()) { freshState(); } - int tmpIndex = (++currentIndex) % leastBrokers.size(); - String broker = leastBrokers.remove(tmpIndex); - currentIndex--; - return broker; + int tmpIndex = currentIndex % leastBrokers.size(); + return leastBrokers.remove(tmpIndex); + } + + public Map getBrokerNumMap() { + return brokerNumMap; + } + + public void upToNum(int maxQueueNum) { + int currSize = idToBroker.size(); + if (maxQueueNum <= currSize) { + return; + } + for (int i = currSize; i < maxQueueNum; i++) { + String nextBroker = nextBroker(); + if (brokerNumMap.containsKey(nextBroker)) { + brokerNumMap.put(nextBroker, brokerNumMap.get(nextBroker) + 1); + } else { + brokerNumMap.put(nextBroker, 1); + } + idToBroker.put(i, nextBroker); + } + } + + public Map getIdToBroker() { + return idToBroker; } } - public static MappingState buildMappingState(Map brokerNumMap) { - return new MappingState(brokerNumMap); + public static MappingAllocator buildMappingAllocator(Map idToBroker, Map brokerNumMap) { + return new MappingAllocator(idToBroker, brokerNumMap); } public static Map.Entry findMaxEpochAndQueueNum(List mappingDetailList) { @@ -92,14 +115,14 @@ public class TopicQueueMappingUtils { for (TopicQueueMappingDetail mappingDetail : mappingDetailList) { for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { Integer globalid = entry.getKey(); - String leaerBrokerName = entry.getValue().iterator().next().getBname(); - if (!leaerBrokerName.equals(mappingDetail.getBname())) { + String leaderBrokerName = getLeaderBroker(entry.getValue()); + if (!leaderBrokerName.equals(mappingDetail.getBname())) { //not the leader continue; } if (globalIdMap.containsKey(globalid)) { if (!replace) { - throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname())); + throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname())); } } else { globalIdMap.put(globalid, entry.getValue()); @@ -108,4 +131,12 @@ public class TopicQueueMappingUtils { } return globalIdMap; } + + public static String getLeaderBroker(ImmutableList items) { + return getLeaderItem(items).getBname(); + } + public static LogicQueueMappingItem getLeaderItem(ImmutableList items) { + assert items.size() > 0; + return items.get(items.size() - 1); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 253d19c9..a1d3f6ff 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -22,6 +22,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.rocketmq.common.LogicQueueMappingItem; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.TopicQueueMappingUtils; @@ -35,11 +36,13 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; public class UpdateStaticTopicSubCommand implements SubCommand { @@ -77,18 +80,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand { return options; } - private void validateIfNull(Map.Entry entry, boolean shouldNull) { - if (shouldNull) { - if (entry.getValue().getTopicQueueMappingInfo() != null) { - throw new RuntimeException("Mapping info should be null in broker " + entry.getKey()); - } - } else { - if (entry.getValue().getTopicQueueMappingInfo() == null) { - throw new RuntimeException("Mapping info should not be null in broker " + entry.getKey()); - } - } - } - @Override public void execute(final CommandLine commandLine, final Options options, @@ -100,73 +91,88 @@ public class UpdateStaticTopicSubCommand implements SubCommand { Map existedTopicConfigMap = new HashMap<>(); Map> globalIdMap = new HashMap<>(); try { - + if (!commandLine.hasOption('t') + || !commandLine.hasOption('c') + || !commandLine.hasOption("qn")) { + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + return; + } String topic = commandLine.getOptionValue('t').trim(); int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim()); - String cluster = commandLine.getOptionValue('c').trim(); + String clusters = commandLine.getOptionValue('c').trim(); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); if (clusterInfo == null - || clusterInfo.getClusterAddrTable().isEmpty() - || clusterInfo.getClusterAddrTable().get(cluster) == null - || clusterInfo.getClusterAddrTable().get(cluster).isEmpty()) { - throw new RuntimeException("The Cluster info is null for " + cluster); + || clusterInfo.getClusterAddrTable().isEmpty()) { + throw new RuntimeException("The Cluster info is empty"); + } else { + clientMetadata.refreshClusterInfo(clusterInfo); } - clientMetadata.refreshClusterInfo(clusterInfo); - //first get the existed topic config and mapping - { - TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); - clientMetadata.freshTopicRoute(topic, routeData); - if (routeData != null + Set brokers = new HashSet<>(); + for (String cluster : clusters.split(",")) { + cluster = cluster.trim(); + if (clusterInfo.getClusterAddrTable().get(cluster) != null) { + brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster)); + } + } + if (brokers.isEmpty()) { + throw new RuntimeException("Find none brokers for " + clusters); + } + + //get the existed topic config and mapping + TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); + clientMetadata.freshTopicRoute(topic, routeData); + if (routeData != null && !routeData.getQueueDatas().isEmpty()) { - for (QueueData queueData: routeData.getQueueDatas()) { - String bname = queueData.getBrokerName(); - String addr = clientMetadata.findMasterBrokerAddr(bname); - TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); - //allow the mapping info is null - if (mapping != null) { - existedTopicConfigMap.put(bname, mapping); - } + for (QueueData queueData: routeData.getQueueDatas()) { + String bname = queueData.getBrokerName(); + String addr = clientMetadata.findMasterBrokerAddr(bname); + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + existedTopicConfigMap.put(bname, mapping); } } } - // the - { - if (!existedTopicConfigMap.isEmpty()) { - //make sure it it not null - existedTopicConfigMap.entrySet().forEach(entry -> { - validateIfNull(entry, false); - }); - //make sure the detail is not dirty - existedTopicConfigMap.entrySet().forEach(entry -> { - if (!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname())) { - throw new RuntimeException(String.format("The broker name is not equal %s != %s ", entry.getKey(), entry.getValue().getTopicQueueMappingInfo().getBname())); - } - if (entry.getValue().getTopicQueueMappingInfo().isDirty()) { - throw new RuntimeException("The mapping info is dirty in broker " + entry.getValue().getTopicQueueMappingInfo().getBname()); - } - }); - - List detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getTopicQueueMappingInfo).collect(Collectors.toList()); - //check the epoch and qnum - Map.Entry maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList); - detailList.forEach( mappingDetail -> { - if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) { - throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); - } - if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) { - throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname())); - } - }); - - globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false); - - if (maxEpochAndNum.getValue() != globalIdMap.size()) { - throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size())); + + Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(-1, queueNum); + if (!existedTopicConfigMap.isEmpty()) { + //make sure it it not null + existedTopicConfigMap.forEach((key, value) -> { + if (value.getMappingDetail() != null) { + throw new RuntimeException("Mapping info should be null in broker " + key); + } + }); + //make sure the detail is not dirty + existedTopicConfigMap.forEach((key, value) -> { + if (!key.equals(value.getMappingDetail().getBname())) { + throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname())); } - for (int i = 0; i < maxEpochAndNum.getValue(); i++) { - if (!globalIdMap.containsKey(i)) { - throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i)); - } + if (value.getMappingDetail().isDirty()) { + throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname()); + } + }); + + List detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList()); + //check the epoch and qnum + maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList); + final Map.Entry tmpMaxEpochAndNum = maxEpochAndNum; + detailList.forEach( mappingDetail -> { + if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) { + throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); + } + if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) { + throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname())); + } + }); + + globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false); + + if (maxEpochAndNum.getValue() != globalIdMap.size()) { + throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size())); + } + for (int i = 0; i < maxEpochAndNum.getValue(); i++) { + if (!globalIdMap.containsKey(i)) { + throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i)); } } } @@ -177,11 +183,47 @@ public class UpdateStaticTopicSubCommand implements SubCommand { if (queueNum == globalIdMap.size()) { throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing"); } - //the check is ok, now do the real - - - - ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + //the check is ok, now do the mapping allocation + Map brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)); + Map idToBroker = new HashMap<>(); + globalIdMap.forEach((key, value) -> { + String leaderbroker = TopicQueueMappingUtils.getLeaderBroker(value); + idToBroker.put(key, leaderbroker); + if (!brokerNumMap.containsKey(leaderbroker)) { + brokerNumMap.put(leaderbroker, 1); + } else { + brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1); + } + }); + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap); + allocator.upToNum(queueNum); + Map newIdToBroker = allocator.getIdToBroker(); + + //construct the topic configAndMapping + int epoch = maxEpochAndNum.getKey() + 1; + newIdToBroker.forEach( (queueId, broker) -> { + TopicConfigAndQueueMapping configMapping; + if (!existedTopicConfigMap.containsKey(broker)) { + TopicConfig topicConfig = new TopicConfig(topic, 1, 1); + TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, broker, epoch); + configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail); + } else { + configMapping = existedTopicConfigMap.get(broker); + configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); + configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); + configMapping.getMappingDetail().setEpoch(epoch); + configMapping.getMappingDetail().setTotalQueues(queueNum); + } + LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); + configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); + }); + + for (Map.Entry entry : existedTopicConfigMap.entrySet()) { + String broker = entry.getKey(); + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = entry.getValue(); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); + } } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { -- GitLab