提交 295a6bde 编写于 作者: D dongeforever

Polish the update utils

上级 f12eceb4
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import com.google.common.collect.ImmutableList;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class TopicQueueMappingUtils {
public static class MappingState {
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
int currentIndex = 0;
Random random = new Random();
List<String> leastBrokers = new ArrayList<String>();
private MappingState(Map<String, Integer> brokerNumMap) {
this.brokerNumMap.putAll(brokerNumMap);
}
public void freshState() {
int minNum = -1;
for (Map.Entry<String, Integer> entry : brokerNumMap.entrySet()) {
if (entry.getValue() > minNum) {
leastBrokers.clear();
leastBrokers.add(entry.getKey());
} else if (entry.getValue() == minNum) {
leastBrokers.add(entry.getKey());
}
}
currentIndex = random.nextInt(leastBrokers.size());
}
public String nextBroker() {
if (leastBrokers.isEmpty()) {
freshState();
}
int tmpIndex = (++currentIndex) % leastBrokers.size();
String broker = leastBrokers.remove(tmpIndex);
currentIndex--;
return broker;
}
}
public static MappingState buildMappingState(Map<String, Integer> brokerNumMap) {
return new MappingState(brokerNumMap);
}
public static Map.Entry<Integer, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
int epoch = -1;
int queueNum = 0;
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
if (mappingDetail.getEpoch() > epoch) {
epoch = mappingDetail.getEpoch();
}
if (mappingDetail.getTotalQueues() > queueNum) {
queueNum = mappingDetail.getTotalQueues();
}
}
return new AbstractMap.SimpleImmutableEntry<Integer, Integer>(epoch, queueNum);
}
public static Map<Integer, ImmutableList<LogicQueueMappingItem>> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override
public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
return o2.getEpoch() - o1.getEpoch();
}
});
Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
String leaerBrokerName = entry.getValue().iterator().next().getBname();
if (!leaerBrokerName.equals(mappingDetail.getBname())) {
//not the leader
continue;
}
if (globalIdMap.containsKey(globalid)) {
if (!replace) {
throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
}
} else {
globalIdMap.put(globalid, entry.getValue());
}
}
}
return globalIdMap;
}
}
......@@ -24,6 +24,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
......@@ -34,9 +35,12 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class UpdateStaticTopicSubCommand implements SubCommand {
......@@ -73,7 +77,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
return options;
}
private void validate(Map.Entry<String, TopicConfigAndQueueMapping> entry, boolean shouldNull) {
private void validateIfNull(Map.Entry<String, TopicConfigAndQueueMapping> entry, boolean shouldNull) {
if (shouldNull) {
if (entry.getValue().getTopicQueueMappingInfo() != null) {
throw new RuntimeException("Mapping info should be null in broker " + entry.getKey());
......@@ -82,30 +86,9 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
if (entry.getValue().getTopicQueueMappingInfo() == null) {
throw new RuntimeException("Mapping info should not be null in broker " + entry.getKey());
}
if (!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", entry.getKey(), entry.getValue().getTopicQueueMappingInfo().getBname()));
}
}
}
public void validateQueueMappingInfo(Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap, TopicQueueMappingDetail mappingDetail) {
if (mappingDetail.isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + mappingDetail.getBname());
}
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
String leaerBrokerName = entry.getValue().iterator().next().getBname();
if (!leaerBrokerName.equals(mappingDetail.getBname())) {
//not the leader
continue;
}
if (globalIdMap.containsKey(globalid)) {
throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
} else {
globalIdMap.put(globalid, entry.getValue());
}
}
}
@Override
public void execute(final CommandLine commandLine, final Options options,
......@@ -129,7 +112,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
throw new RuntimeException("The Cluster info is null for " + cluster);
}
clientMetadata.refreshClusterInfo(clusterInfo);
//first get the existed topic config and mapping
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
......@@ -146,29 +128,45 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
}
}
}
}
// the
{
if (!existedTopicConfigMap.isEmpty()) {
Iterator<Map.Entry<String, TopicConfigAndQueueMapping>> it = existedTopicConfigMap.entrySet().iterator();
Map.Entry<String, TopicConfigAndQueueMapping> first = it.next();
validate(first, false);
validateQueueMappingInfo(globalIdMap, first.getValue().getTopicQueueMappingInfo());
TopicQueueMappingDetail firstMapping = first.getValue().getTopicQueueMappingInfo();
while (it.hasNext()) {
Map.Entry<String, TopicConfigAndQueueMapping> next = it.next();
validate(next, false);
validateQueueMappingInfo(globalIdMap, next.getValue().getTopicQueueMappingInfo());
TopicQueueMappingDetail nextMapping = next.getValue().getTopicQueueMappingInfo();
if (firstMapping.getEpoch() != nextMapping.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s %s", firstMapping.getEpoch(), nextMapping.getEpoch(), firstMapping.getBname(), nextMapping.getBname()));
//make sure it it not null
existedTopicConfigMap.entrySet().forEach(entry -> {
validateIfNull(entry, false);
});
//make sure the detail is not dirty
existedTopicConfigMap.entrySet().forEach(entry -> {
if (!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", entry.getKey(), entry.getValue().getTopicQueueMappingInfo().getBname()));
}
if (entry.getValue().getTopicQueueMappingInfo().isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + entry.getValue().getTopicQueueMappingInfo().getBname());
}
});
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getTopicQueueMappingInfo).collect(Collectors.toList());
//check the epoch and qnum
Map.Entry<Integer, Integer> maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
detailList.forEach( mappingDetail -> {
if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
}
if (firstMapping.getTotalQueues() != nextMapping.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s %s", firstMapping.getTotalQueues(), nextMapping.getTotalQueues(), firstMapping.getBname(), nextMapping.getBname()));
if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
});
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
if (maxEpochAndNum.getValue() != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
}
if (firstMapping.getTotalQueues() != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", firstMapping.getTotalQueues(), globalIdMap.size()));
for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
if (!globalIdMap.containsKey(i)) {
throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
}
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册