From f225fd05de44f5e46dc5da34a27fe0b1e947f0ef Mon Sep 17 00:00:00 2001 From: dongeforever Date: Thu, 18 Nov 2021 17:31:41 +0800 Subject: [PATCH] Polish the code for command --- .../statictopic/TopicQueueMappingUtils.java | 17 +- .../TopicRemappingDetailWrapper.java | 25 ++ .../topic/RemappingStaticTopicSubCommand.java | 299 +++++++++++------- .../topic/UpdateStaticTopicSubCommand.java | 26 +- 4 files changed, 225 insertions(+), 142 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 9784199c..545b7cff 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -116,13 +116,12 @@ public class TopicQueueMappingUtils { return detailList; } - public static Map.Entry validConsistenceOfTopicConfigAndQueueMapping(Map brokerConfigMap) { + public static Map.Entry checkConsistenceOfTopicConfigAndQueueMapping(String topic, Map brokerConfigMap) { if (brokerConfigMap == null || brokerConfigMap.isEmpty()) { return null; } //make sure it it not null - String topic = null; long maxEpoch = -1; int maxNum = -1; for (Map.Entry entry : brokerConfigMap.entrySet()) { @@ -143,9 +142,7 @@ public class TopicQueueMappingUtils { } if (topic != null && !topic.equals(mappingDetail.getTopic())) { - throw new RuntimeException("The topic name is inconsistent in broker " + broker); - } else { - topic = mappingDetail.getTopic(); + throw new RuntimeException("The topic name is not match for broker " + broker); } if (maxEpoch != -1 @@ -165,7 +162,7 @@ public class TopicQueueMappingUtils { return new AbstractMap.SimpleEntry(maxEpoch, maxNum); } - public static Map buildMappingItems(List mappingDetailList, boolean replace, boolean checkConsistence) { + public static Map checkAndBuildMappingItems(List mappingDetailList, boolean replace, boolean checkConsistence) { Collections.sort(mappingDetailList, new Comparator() { @Override public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) { @@ -216,10 +213,14 @@ public class TopicQueueMappingUtils { return items.get(items.size() - 1); } - public static String writeToTemp(TopicRemappingDetailWrapper wrapper, String suffix) { + public static String writeToTemp(TopicRemappingDetailWrapper wrapper, boolean after) { String topic = wrapper.getTopic(); String data = wrapper.toJson(); - String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + "-" + suffix; + String suffix = TopicRemappingDetailWrapper.SUFFIX_BEFORE; + if (after) { + suffix = TopicRemappingDetailWrapper.SUFFIX_AFTER; + } + String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + suffix; try { MixAll.string2File(data, fileName); return fileName; diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java index d229203b..a10dba37 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java @@ -3,12 +3,17 @@ package org.apache.rocketmq.common.statictopic; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; public class TopicRemappingDetailWrapper extends RemotingSerializable { public static final String TYPE_CREATE_OR_UPDATE = "CREATE_OR_UPDATE"; public static final String TYPE_REMAPPING = "REMAPPING"; + public static final String SUFFIX_BEFORE = ".before"; + public static final String SUFFIX_AFTER = ".after"; + private final String topic; private final String type; @@ -17,6 +22,10 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable { private Map brokerConfigMap = new HashMap(); + private Set brokerToMapIn = new HashSet(); + + private Set brokerToMapOut = new HashSet(); + public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map expectedIdToBroker, Map brokerConfigMap) { this.topic = topic; this.type = type; @@ -44,4 +53,20 @@ public class TopicRemappingDetailWrapper extends RemotingSerializable { public Map getBrokerConfigMap() { return brokerConfigMap; } + + public Set getBrokerToMapIn() { + return brokerToMapIn; + } + + public void setBrokerToMapIn(Set brokerToMapIn) { + this.brokerToMapIn = brokerToMapIn; + } + + public Set getBrokerToMapOut() { + return brokerToMapOut; + } + + public void setBrokerToMapOut(Set brokerToMapOut) { + this.brokerToMapOut = brokerToMapOut; + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java index 472ce0b4..656396c4 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java @@ -21,9 +21,9 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; -import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.admin.TopicOffset; @@ -33,6 +33,7 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; @@ -79,24 +80,129 @@ public class RemappingStaticTopicSubCommand implements SubCommand { opt = new Option("t", "topic", true, "topic name"); opt.setRequired(true); options.addOption(opt); + + opt = new Option("f", "mapFile", true, "The map file name"); + opt.setRequired(false); + options.addOption(opt); return options; } + public void executeFromFile(final CommandLine commandLine, final Options options, + RPCHook rpcHook) throws SubCommandException { + + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + ClientMetadata clientMetadata = new ClientMetadata(); + + try { + String topic = commandLine.getOptionValue('t').trim(); + + String mapFileName = commandLine.getOptionValue('f').trim(); + String mapData = MixAll.file2String(mapFileName); + TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class); + //double check the config + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap()); + TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); + + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + if (clusterInfo == null + || clusterInfo.getClusterAddrTable().isEmpty()) { + throw new RuntimeException("The Cluster info is empty"); + } + clientMetadata.refreshClusterInfo(clusterInfo); + doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt); + return; + }catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + + + public void doRemapping(String topic, Set brokersToMapIn, Set brokersToMapOut, Map brokerConfigMap, + ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception { + // now do the remapping + //Step1: let the new leader can be write without the logicOffset + for (String broker: brokersToMapIn) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); + } + //Step2: forbid the write of old leader + for (String broker: brokersToMapOut) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); + } + //Step3: decide the logic offset + for (String broker: brokersToMapOut) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic); + TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker); + for (Map.Entry> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) { + ImmutableList items = entry.getValue(); + Integer globalId = entry.getKey(); + if (items.size() < 2) { + continue; + } + LogicQueueMappingItem newLeader = items.get(items.size() - 1); + LogicQueueMappingItem oldLeader = items.get(items.size() - 2); + if (newLeader.getLogicOffset() > 0) { + continue; + } + TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId())); + if (topicOffset == null) { + throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader); + } + //TODO check the max offset, will it return -1? + if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { + throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); + } + newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000)); + TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname()); + //fresh the new leader + mapInConfig.getMappingDetail().putMappingInfo(globalId, items); + } + } + //Step4: write to the new leader with logic offset + for (String broker: brokersToMapIn) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); + } + } + + + + + @Override public void execute(final CommandLine commandLine, final Options options, - RPCHook rpcHook) throws SubCommandException { + RPCHook rpcHook) throws SubCommandException { + + if (!commandLine.hasOption('t')) { + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + return; + } + + if (commandLine.hasOption("f")) { + executeFromFile(commandLine, options, rpcHook); + return; + } + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); ClientMetadata clientMetadata = new ClientMetadata(); - Map existedTopicConfigMap = new HashMap<>(); + Map brokerConfigMap = new HashMap<>(); Map globalIdMap = new HashMap<>(); - Set brokers = new HashSet<>(); - Map.Entry maxEpochAndNum = null; + Set targetBrokers = new HashSet<>(); + try { - if ((!commandLine.hasOption("b") && !commandLine.hasOption('c')) - || !commandLine.hasOption('t')) { + if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) { ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); return; } @@ -108,84 +214,67 @@ public class RemappingStaticTopicSubCommand implements SubCommand { throw new RuntimeException("The Cluster info is empty"); } clientMetadata.refreshClusterInfo(clusterInfo); - - if (commandLine.hasOption("b")) { - String brokerStrs = commandLine.getOptionValue("b").trim(); - for (String broker: brokerStrs.split(",")) { - brokers.add(broker.trim()); - } - } else if (commandLine.hasOption("c")) { - String clusters = commandLine.getOptionValue('c').trim(); - for (String cluster : clusters.split(",")) { - cluster = cluster.trim(); - if (clusterInfo.getClusterAddrTable().get(cluster) != null) { - brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster)); + { + if (commandLine.hasOption("b")) { + String brokerStrs = commandLine.getOptionValue("b").trim(); + for (String broker: brokerStrs.split(",")) { + targetBrokers.add(broker.trim()); + } + } else if (commandLine.hasOption("c")) { + String clusters = commandLine.getOptionValue('c').trim(); + for (String cluster : clusters.split(",")) { + cluster = cluster.trim(); + if (clusterInfo.getClusterAddrTable().get(cluster) != null) { + targetBrokers.addAll(clusterInfo.getClusterAddrTable().get(cluster)); + } } } - } - if (brokers.isEmpty()) { - throw new RuntimeException("Find none brokers"); - } - for (String broker : brokers) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - if (addr == null) { - throw new RuntimeException("Can't find addr for broker " + broker); + if (targetBrokers.isEmpty()) { + throw new RuntimeException("Find none brokers, do nothing"); + } + for (String broker : targetBrokers) { + String addr = clientMetadata.findMasterBrokerAddr(broker); + if (addr == null) { + throw new RuntimeException("Can't find addr for broker " + broker); + } } } //get the existed topic config and mapping - TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); - clientMetadata.freshTopicRoute(topic, routeData); - if (routeData != null - && !routeData.getQueueDatas().isEmpty()) { - for (QueueData queueData: routeData.getQueueDatas()) { - String bname = queueData.getBrokerName(); - String addr = clientMetadata.findMasterBrokerAddr(bname); - TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); - //allow the config is null - if (mapping != null) { - existedTopicConfigMap.put(bname, mapping); + { + TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); + clientMetadata.freshTopicRoute(topic, routeData); + if (routeData != null + && !routeData.getQueueDatas().isEmpty()) { + for (QueueData queueData: routeData.getQueueDatas()) { + String bname = queueData.getBrokerName(); + String addr = clientMetadata.findMasterBrokerAddr(bname); + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + brokerConfigMap.put(bname, mapping); + } } } } - if (existedTopicConfigMap.isEmpty()) { + if (brokerConfigMap.isEmpty()) { throw new RuntimeException("No topic route to do the remapping"); } - //make sure it it not null - existedTopicConfigMap.forEach((key, value) -> { - if (value.getMappingDetail() != null) { - throw new RuntimeException("Mapping info should be null in broker " + key); - } - }); - //make sure the detail is not dirty - existedTopicConfigMap.forEach((key, value) -> { - if (!key.equals(value.getMappingDetail().getBname())) { - throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname())); - } - if (value.getMappingDetail().isDirty()) { - throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname()); - } - }); - - List detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList()); - //check the epoch and qnum - maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList); - for (TopicQueueMappingDetail mappingDetail : detailList) { - if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) { - throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); - } - if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) { - throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname())); - } - } - globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true); - + final Map.Entry maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); //the check is ok, now do the mapping allocation int maxNum = maxEpochAndNum.getValue(); long maxEpoch = maxEpochAndNum.getKey(); - TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0))); + + { + TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, maxEpoch, new HashMap<>(), brokerConfigMap); + String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false); + System.out.println("The old mapping data is written to file " + oldMappingDataFile); + } + + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0))); allocator.upToNum(maxNum); Map expectedBrokerNumMap = allocator.getBrokerNumMap(); Queue waitAssignQueues = new ArrayDeque(); @@ -213,6 +302,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { expectedIdToBroker.put(queueId, broker); } }); + long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); //Now construct the remapping info Set brokersToMapOut = new HashSet<>(); @@ -230,8 +320,8 @@ public class RemappingStaticTopicSubCommand implements SubCommand { String mapOutBroker = topicQueueMappingOne.getBname(); brokersToMapIn.add(mapInBroker); brokersToMapOut.add(mapOutBroker); - TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(mapInBroker); - TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(mapOutBroker); + TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker); + TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker); mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1); mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1); @@ -246,61 +336,24 @@ public class RemappingStaticTopicSubCommand implements SubCommand { mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems); } - long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); - existedTopicConfigMap.values().forEach( configMapping -> { - configMapping.getMappingDetail().setEpoch(epoch); + brokerConfigMap.values().forEach(configMapping -> { + configMapping.getMappingDetail().setEpoch(newEpoch); configMapping.getMappingDetail().setTotalQueues(maxNum); }); - TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true); - // now do the remapping - //Step1: let the new leader can be write without the logicOffset - for (String broker: brokersToMapIn) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker); - defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); - } - //Step2: forbid the write of old leader - for (String broker: brokersToMapOut) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker); - defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); - } - //Step3: decide the logic offset - for (String broker: brokersToMapOut) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic); - TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(broker); - for (Map.Entry> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) { - ImmutableList items = entry.getValue(); - Integer globalId = entry.getKey(); - if (items.size() < 2) { - continue; - } - LogicQueueMappingItem newLeader = items.get(items.size() - 1); - LogicQueueMappingItem oldLeader = items.get(items.size() - 2); - if (newLeader.getLogicOffset() > 0) { - continue; - } - TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId())); - if (topicOffset == null) { - throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader); - } - //TODO check the max offset, will it return -1? - if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { - throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); - } - newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000)); - TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(newLeader.getBname()); - //fresh the new leader - mapInConfig.getMappingDetail().putMappingInfo(globalId, items); - } - } - //Step4: write to the new leader with logic offset - for (String broker: brokersToMapIn) { - String addr = clientMetadata.findMasterBrokerAddr(broker); - TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker); - defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); + //double check + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(brokerConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true); + + { + TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, expectedIdToBroker, brokerConfigMap); + newWrapper.setBrokerToMapIn(brokersToMapIn); + newWrapper.setBrokerToMapOut(brokersToMapOut); + String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true); + System.out.println("The old mapping data is written to file " + newMappingDataFile); } + + doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt); + } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 21ce1cf0..cdfdcfe3 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -39,7 +39,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; -import java.nio.charset.Charset; import java.util.AbstractMap; import java.util.ArrayList; import java.util.HashMap; @@ -84,7 +83,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { options.addOption(opt); opt = new Option("f", "mapFile", true, "The map file name"); - opt.setRequired(true); + opt.setRequired(false); options.addOption(opt); return options; @@ -99,12 +98,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ClientMetadata clientMetadata = new ClientMetadata(); try { + String topic = commandLine.getOptionValue('t').trim(); String mapFileName = commandLine.getOptionValue('f').trim(); String mapData = MixAll.file2String(mapFileName); TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class); //double check the config - TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(wrapper.getBrokerConfigMap()); - TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap()); + TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); if (clusterInfo == null @@ -131,6 +131,11 @@ public class UpdateStaticTopicSubCommand implements SubCommand { @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { + if (!commandLine.hasOption('t')) { + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + return; + } + if (commandLine.hasOption("f")) { executeFromFile(commandLine, options, rpcHook); return; @@ -159,7 +164,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } clientMetadata.refreshClusterInfo(clusterInfo); - String clusters = commandLine.getOptionValue('c').trim(); for (String cluster : clusters.split(",")) { cluster = cluster.trim(); @@ -200,8 +204,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand { Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum); if (!brokerConfigMap.isEmpty()) { - maxEpochAndNum = TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap); - globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); } if (queueNum < globalIdMap.size()) { throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size())); @@ -213,7 +217,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { { TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap); - String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, "before"); + String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false); System.out.println("The old mapping data is written to file " + oldMappingDataFile); } @@ -264,12 +268,12 @@ public class UpdateStaticTopicSubCommand implements SubCommand { configMapping.getMappingDetail().setTotalQueues(queueNum); }); //double check the config - TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap); - TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); { TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap); - String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, "after"); + String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true); System.out.println("The new mapping data is written to file " + newMappingDataFile); } -- GitLab