From 295a6bde63210d73570dcdfc71ccd100a3c87444 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 17 Nov 2021 21:49:27 +0800 Subject: [PATCH] Polish the update utils --- .../common/TopicQueueMappingUtils.java | 111 ++++++++++++++++++ .../topic/UpdateStaticTopicSubCommand.java | 78 ++++++------ 2 files changed, 149 insertions(+), 40 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java new file mode 100644 index 00000000..93345df5 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import com.google.common.collect.ImmutableList; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +public class TopicQueueMappingUtils { + + public static class MappingState { + Map brokerNumMap = new HashMap(); + int currentIndex = 0; + Random random = new Random(); + List leastBrokers = new ArrayList(); + private MappingState(Map brokerNumMap) { + this.brokerNumMap.putAll(brokerNumMap); + } + + public void freshState() { + int minNum = -1; + for (Map.Entry entry : brokerNumMap.entrySet()) { + if (entry.getValue() > minNum) { + leastBrokers.clear(); + leastBrokers.add(entry.getKey()); + } else if (entry.getValue() == minNum) { + leastBrokers.add(entry.getKey()); + } + } + currentIndex = random.nextInt(leastBrokers.size()); + } + + public String nextBroker() { + if (leastBrokers.isEmpty()) { + freshState(); + } + int tmpIndex = (++currentIndex) % leastBrokers.size(); + String broker = leastBrokers.remove(tmpIndex); + currentIndex--; + return broker; + } + } + + public static MappingState buildMappingState(Map brokerNumMap) { + return new MappingState(brokerNumMap); + } + + public static Map.Entry findMaxEpochAndQueueNum(List mappingDetailList) { + int epoch = -1; + int queueNum = 0; + for (TopicQueueMappingDetail mappingDetail : mappingDetailList) { + if (mappingDetail.getEpoch() > epoch) { + epoch = mappingDetail.getEpoch(); + } + if (mappingDetail.getTotalQueues() > queueNum) { + queueNum = mappingDetail.getTotalQueues(); + } + } + return new AbstractMap.SimpleImmutableEntry(epoch, queueNum); + } + + public static Map> buildMappingItems(List mappingDetailList, boolean replace) { + Collections.sort(mappingDetailList, new Comparator() { + @Override + public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) { + return o2.getEpoch() - o1.getEpoch(); + } + }); + + Map> globalIdMap = new HashMap>(); + for (TopicQueueMappingDetail mappingDetail : mappingDetailList) { + for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { + Integer globalid = entry.getKey(); + String leaerBrokerName = entry.getValue().iterator().next().getBname(); + if (!leaerBrokerName.equals(mappingDetail.getBname())) { + //not the leader + continue; + } + if (globalIdMap.containsKey(globalid)) { + if (!replace) { + throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname())); + } + } else { + globalIdMap.put(globalid, entry.getValue()); + } + } + } + return globalIdMap; + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 05f28073..253d19c9 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -24,6 +24,7 @@ import org.apache.commons.cli.Options; import org.apache.rocketmq.common.LogicQueueMappingItem; import org.apache.rocketmq.common.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.TopicQueueMappingDetail; +import org.apache.rocketmq.common.TopicQueueMappingUtils; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; @@ -34,9 +35,12 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class UpdateStaticTopicSubCommand implements SubCommand { @@ -73,7 +77,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { return options; } - private void validate(Map.Entry entry, boolean shouldNull) { + private void validateIfNull(Map.Entry entry, boolean shouldNull) { if (shouldNull) { if (entry.getValue().getTopicQueueMappingInfo() != null) { throw new RuntimeException("Mapping info should be null in broker " + entry.getKey()); @@ -82,30 +86,9 @@ public class UpdateStaticTopicSubCommand implements SubCommand { if (entry.getValue().getTopicQueueMappingInfo() == null) { throw new RuntimeException("Mapping info should not be null in broker " + entry.getKey()); } - if (!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname())) { - throw new RuntimeException(String.format("The broker name is not equal %s != %s ", entry.getKey(), entry.getValue().getTopicQueueMappingInfo().getBname())); - } } } - public void validateQueueMappingInfo(Map> globalIdMap, TopicQueueMappingDetail mappingDetail) { - if (mappingDetail.isDirty()) { - throw new RuntimeException("The mapping info is dirty in broker " + mappingDetail.getBname()); - } - for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { - Integer globalid = entry.getKey(); - String leaerBrokerName = entry.getValue().iterator().next().getBname(); - if (!leaerBrokerName.equals(mappingDetail.getBname())) { - //not the leader - continue; - } - if (globalIdMap.containsKey(globalid)) { - throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname())); - } else { - globalIdMap.put(globalid, entry.getValue()); - } - } - } @Override public void execute(final CommandLine commandLine, final Options options, @@ -129,7 +112,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand { throw new RuntimeException("The Cluster info is null for " + cluster); } clientMetadata.refreshClusterInfo(clusterInfo); - //first get the existed topic config and mapping { TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); @@ -146,29 +128,45 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } } } - } + // the { if (!existedTopicConfigMap.isEmpty()) { - Iterator> it = existedTopicConfigMap.entrySet().iterator(); - Map.Entry first = it.next(); - validate(first, false); - validateQueueMappingInfo(globalIdMap, first.getValue().getTopicQueueMappingInfo()); - TopicQueueMappingDetail firstMapping = first.getValue().getTopicQueueMappingInfo(); - while (it.hasNext()) { - Map.Entry next = it.next(); - validate(next, false); - validateQueueMappingInfo(globalIdMap, next.getValue().getTopicQueueMappingInfo()); - TopicQueueMappingDetail nextMapping = next.getValue().getTopicQueueMappingInfo(); - if (firstMapping.getEpoch() != nextMapping.getEpoch()) { - throw new RuntimeException(String.format("epoch dose not match %d != %d in %s %s", firstMapping.getEpoch(), nextMapping.getEpoch(), firstMapping.getBname(), nextMapping.getBname())); + //make sure it it not null + existedTopicConfigMap.entrySet().forEach(entry -> { + validateIfNull(entry, false); + }); + //make sure the detail is not dirty + existedTopicConfigMap.entrySet().forEach(entry -> { + if (!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname())) { + throw new RuntimeException(String.format("The broker name is not equal %s != %s ", entry.getKey(), entry.getValue().getTopicQueueMappingInfo().getBname())); + } + if (entry.getValue().getTopicQueueMappingInfo().isDirty()) { + throw new RuntimeException("The mapping info is dirty in broker " + entry.getValue().getTopicQueueMappingInfo().getBname()); + } + }); + + List detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getTopicQueueMappingInfo).collect(Collectors.toList()); + //check the epoch and qnum + Map.Entry maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList); + detailList.forEach( mappingDetail -> { + if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) { + throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); } - if (firstMapping.getTotalQueues() != nextMapping.getTotalQueues()) { - throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s %s", firstMapping.getTotalQueues(), nextMapping.getTotalQueues(), firstMapping.getBname(), nextMapping.getBname())); + if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) { + throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname())); } + }); + + globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false); + + if (maxEpochAndNum.getValue() != globalIdMap.size()) { + throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size())); } - if (firstMapping.getTotalQueues() != globalIdMap.size()) { - throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", firstMapping.getTotalQueues(), globalIdMap.size())); + for (int i = 0; i < maxEpochAndNum.getValue(); i++) { + if (!globalIdMap.containsKey(i)) { + throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i)); + } } } } -- GitLab