From 25a588b81fd58dccb1e5f81e8a65859570a76a89 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Thu, 18 Nov 2021 12:00:15 +0800 Subject: [PATCH] Init the remapping command --- .../rocketmq/common/TopicQueueMappingOne.java | 54 ++++ .../common/TopicQueueMappingUtils.java | 6 +- .../topic/RemappingStaticTopicSubCommand.java | 267 ++++++++++++++++++ .../topic/UpdateStaticTopicSubCommand.java | 38 ++- 4 files changed, 348 insertions(+), 17 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java new file mode 100644 index 00000000..150a2087 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import com.google.common.collect.ImmutableList; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class TopicQueueMappingOne extends RemotingSerializable { + + String topic; // redundant field + String bname; //identify the hosted broker name + Integer globalId; + ImmutableList items; + + public TopicQueueMappingOne(String topic, String bname, Integer globalId, ImmutableList items) { + this.topic = topic; + this.bname = bname; + this.globalId = globalId; + this.items = items; + } + + public String getTopic() { + return topic; + } + + public String getBname() { + return bname; + } + + public Integer getGlobalId() { + return globalId; + } + + public ImmutableList getItems() { + return items; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java index 686208a8..ff89aaf8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java @@ -103,7 +103,7 @@ public class TopicQueueMappingUtils { return new AbstractMap.SimpleImmutableEntry(epoch, queueNum); } - public static Map> buildMappingItems(List mappingDetailList, boolean replace) { + public static Map buildMappingItems(List mappingDetailList, boolean replace) { Collections.sort(mappingDetailList, new Comparator() { @Override public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) { @@ -111,7 +111,7 @@ public class TopicQueueMappingUtils { } }); - Map> globalIdMap = new HashMap>(); + Map globalIdMap = new HashMap(); for (TopicQueueMappingDetail mappingDetail : mappingDetailList) { for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { Integer globalid = entry.getKey(); @@ -125,7 +125,7 @@ public class TopicQueueMappingUtils { throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname())); } } else { - globalIdMap.put(globalid, entry.getValue()); + globalIdMap.put(globalid, new TopicQueueMappingOne(mappingDetail.topic, mappingDetail.bname, globalid, entry.getValue())); } } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java new file mode 100644 index 00000000..4cc5acfc --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.LogicQueueMappingItem; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.TopicConfigAndQueueMapping; +import org.apache.rocketmq.common.TopicQueueMappingDetail; +import org.apache.rocketmq.common.TopicQueueMappingOne; +import org.apache.rocketmq.common.TopicQueueMappingUtils; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.util.AbstractMap; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.Collectors; + +public class RemappingStaticTopicSubCommand implements SubCommand { + + @Override + public String commandName() { + return "updateStaticTopic"; + } + + @Override + public String commandDesc() { + return "Update or create static topic, which has fixed number of queues"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + OptionGroup optionGroup = new OptionGroup(); + + Option opt = null; + + opt = new Option("c", "clusters", true, "remapping static topic to clusters, comma separated"); + optionGroup.addOption(opt); + + opt = new Option("b", "brokers", true, "remapping static topic to brokers, comma separated"); + optionGroup.addOption(opt); + + optionGroup.setRequired(true); + options.addOptionGroup(optionGroup); + + opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + + @Override + public void execute(final CommandLine commandLine, final Options options, + RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + ClientMetadata clientMetadata = new ClientMetadata(); + Map existedTopicConfigMap = new HashMap<>(); + Map globalIdMap = new HashMap<>(); + Set brokers = new HashSet<>(); + Map.Entry maxEpochAndNum = null; + try { + if ((!commandLine.hasOption("b") && !commandLine.hasOption('c')) + || !commandLine.hasOption('t')) { + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + return; + } + String topic = commandLine.getOptionValue('t').trim(); + + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + if (clusterInfo == null + || clusterInfo.getClusterAddrTable().isEmpty()) { + throw new RuntimeException("The Cluster info is empty"); + } + clientMetadata.refreshClusterInfo(clusterInfo); + + if (commandLine.hasOption("b")) { + String brokerStrs = commandLine.getOptionValue("b").trim(); + for (String broker: brokerStrs.split(",")) { + brokers.add(broker.trim()); + } + } else if (commandLine.hasOption("c")) { + String clusters = commandLine.getOptionValue('c').trim(); + for (String cluster : clusters.split(",")) { + cluster = cluster.trim(); + if (clusterInfo.getClusterAddrTable().get(cluster) != null) { + brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster)); + } + } + } + if (brokers.isEmpty()) { + throw new RuntimeException("Find none brokers"); + } + for (String broker : brokers) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + if (addr == null) { + throw new RuntimeException("Can't find addr for broker " + broker); + } + } + + //get the existed topic config and mapping + TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); + clientMetadata.freshTopicRoute(topic, routeData); + if (routeData != null + && !routeData.getQueueDatas().isEmpty()) { + for (QueueData queueData: routeData.getQueueDatas()) { + String bname = queueData.getBrokerName(); + String addr = clientMetadata.findMasterBrokerAddr(bname); + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + existedTopicConfigMap.put(bname, mapping); + } + } + } + + if (existedTopicConfigMap.isEmpty()) { + throw new RuntimeException("No topic route to do the remapping"); + } + + //make sure it it not null + existedTopicConfigMap.forEach((key, value) -> { + if (value.getMappingDetail() != null) { + throw new RuntimeException("Mapping info should be null in broker " + key); + } + }); + //make sure the detail is not dirty + existedTopicConfigMap.forEach((key, value) -> { + if (!key.equals(value.getMappingDetail().getBname())) { + throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname())); + } + if (value.getMappingDetail().isDirty()) { + throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname()); + } + }); + + List detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList()); + //check the epoch and qnum + maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList); + final Map.Entry tmpMaxEpochAndNum = maxEpochAndNum; + detailList.forEach( mappingDetail -> { + if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) { + throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); + } + if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) { + throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname())); + } + }); + + globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false); + + if (maxEpochAndNum.getValue() != globalIdMap.size()) { + throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size())); + } + for (int i = 0; i < maxEpochAndNum.getValue(); i++) { + if (!globalIdMap.containsKey(i)) { + throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i)); + } + } + + //the check is ok, now do the mapping allocation + int maxNum = maxEpochAndNum.getValue(); + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0))); + allocator.upToNum(maxNum); + Map expectedBrokerNumMap = allocator.getBrokerNumMap(); + Queue waitAssignQueues = new ArrayDeque(); + Map expectedIdToBroker = new HashMap<>(); + //the following logic will make sure that, for one broker, only "take in" or "take out" queues + //It can't, take in some queues but alse take out some queues. + globalIdMap.forEach((queueId, mappingOne) -> { + String leaderBroker = mappingOne.getBname(); + if (expectedBrokerNumMap.containsKey(leaderBroker)) { + if (expectedBrokerNumMap.get(leaderBroker) > 0) { + expectedIdToBroker.put(queueId, leaderBroker); + expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1); + } else { + waitAssignQueues.add(queueId); + expectedBrokerNumMap.remove(leaderBroker); + } + } else { + waitAssignQueues.add(queueId); + } + }); + expectedBrokerNumMap.forEach((broker, queueNum) -> { + for (int i = 0; i < queueNum; i++) { + expectedIdToBroker.put(waitAssignQueues.poll(), broker); + } + }); + + Set + + //Now construct the remapping info + + //construct the topic configAndMapping + long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); + for (Map.Entry e : expectedIdToBroker.entrySet()) { + Integer queueId = e.getKey(); + String broker = e.getValue(); + if (globalIdMap.containsKey(queueId)) { + //ignore the exited + continue; + } + TopicConfigAndQueueMapping configMapping; + if (!existedTopicConfigMap.containsKey(broker)) { + TopicConfig topicConfig = new TopicConfig(topic, 1, 1); + TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker, epoch); + configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail); + existedTopicConfigMap.put(broker, configMapping); + } else { + configMapping = existedTopicConfigMap.get(broker); + configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); + configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); + configMapping.getMappingDetail().setEpoch(epoch); + configMapping.getMappingDetail().setTotalQueues(0); + } + LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); + configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); + } + + //If some succeed, and others fail, it will cause inconsistent data + for (Map.Entry entry : existedTopicConfigMap.entrySet()) { + String broker = entry.getKey(); + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = entry.getValue(); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); + } + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 29a7261b..a1ff0b00 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.common.LogicQueueMappingItem; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.TopicQueueMappingDetail; +import org.apache.rocketmq.common.TopicQueueMappingOne; import org.apache.rocketmq.common.TopicQueueMappingUtils; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.route.QueueData; @@ -64,7 +65,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { Option opt = null; - opt = new Option("c", "clusterName", true, "create topic to which cluster"); + opt = new Option("c", "clusters", true, "create topic to clusters, comma separated"); optionGroup.addOption(opt); optionGroup.setRequired(true); @@ -90,7 +91,9 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ClientMetadata clientMetadata = new ClientMetadata(); Map existedTopicConfigMap = new HashMap<>(); - Map> globalIdMap = new HashMap<>(); + Map globalIdMap = new HashMap<>(); + Set brokers = new HashSet<>(); + try { if (!commandLine.hasOption('t') || !commandLine.hasOption('c') @@ -108,7 +111,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } else { clientMetadata.refreshClusterInfo(clusterInfo); } - Set brokers = new HashSet<>(); for (String cluster : clusters.split(",")) { cluster = cluster.trim(); if (clusterInfo.getClusterAddrTable().get(cluster) != null) { @@ -118,6 +120,12 @@ public class UpdateStaticTopicSubCommand implements SubCommand { if (brokers.isEmpty()) { throw new RuntimeException("Find none brokers for " + clusters); } + for (String broker : brokers) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + if (addr == null) { + throw new RuntimeException("Can't find addr for broker " + broker); + } + } //get the existed topic config and mapping TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); @@ -188,7 +196,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { Map brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)); Map idToBroker = new HashMap<>(); globalIdMap.forEach((key, value) -> { - String leaderbroker = TopicQueueMappingUtils.getLeaderBroker(value); + String leaderbroker = value.getBname(); idToBroker.put(key, leaderbroker); if (!brokerNumMap.containsKey(leaderbroker)) { brokerNumMap.put(leaderbroker, 1); @@ -204,27 +212,29 @@ public class UpdateStaticTopicSubCommand implements SubCommand { long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); for (Map.Entry e : newIdToBroker.entrySet()) { Integer queueId = e.getKey(); - String value = e.getValue(); + String broker = e.getValue(); if (globalIdMap.containsKey(queueId)) { //ignore the exited continue; } TopicConfigAndQueueMapping configMapping; - if (!existedTopicConfigMap.containsKey(value)) { - TopicConfig topicConfig = new TopicConfig(topic, 1, 1); - TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, value, epoch); - configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail); + if (!existedTopicConfigMap.containsKey(broker)) { + configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1)); + configMapping.setWriteQueueNums(1); + configMapping.setReadQueueNums(1); + existedTopicConfigMap.put(broker, configMapping); } else { - configMapping = existedTopicConfigMap.get(value); + configMapping = existedTopicConfigMap.get(broker); configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); - configMapping.getMappingDetail().setEpoch(epoch); - configMapping.getMappingDetail().setTotalQueues(queueNum); } - LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0, -1, -1, -1); + LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); } - + existedTopicConfigMap.values().forEach( configMapping -> { + configMapping.getMappingDetail().setEpoch(epoch); + configMapping.getMappingDetail().setTotalQueues(queueNum); + }); //If some succeed, and others fail, it will cause inconsistent data for (Map.Entry entry : existedTopicConfigMap.entrySet()) { String broker = entry.getKey(); -- GitLab