From 165e133a64ae34fa992223f3472f6398d3c9cb2a Mon Sep 17 00:00:00 2001 From: dongeforever Date: Thu, 18 Nov 2021 16:57:19 +0800 Subject: [PATCH] Enable to run from file for createStaticTopic command --- .../body/TopicConfigSerializeWrapper.java | 6 +- .../statictopic/TopicQueueMappingUtils.java | 77 +++++++++ .../TopicRemappingDetailWrapper.java | 47 ++++++ .../TopicQueueMappingTest.java | 12 +- .../topic/UpdateStaticTopicSubCommand.java | 156 +++++++++++------- 5 files changed, 236 insertions(+), 62 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java rename common/src/test/java/org/apache/rocketmq/common/{ => statictopic}/TopicQueueMappingTest.java (88%) diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java index 1176e6b9..a71c50a6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java @@ -17,13 +17,13 @@ package org.apache.rocketmq.common.protocol.body; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + public class TopicConfigSerializeWrapper extends RemotingSerializable { private ConcurrentMap topicConfigTable = new ConcurrentHashMap(); diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index dfb6bbf2..9784199c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -17,9 +17,12 @@ package org.apache.rocketmq.common.statictopic; import com.google.common.collect.ImmutableList; +import org.apache.rocketmq.common.MixAll; +import java.io.File; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -103,6 +106,65 @@ public class TopicQueueMappingUtils { return new AbstractMap.SimpleImmutableEntry(epoch, queueNum); } + public static List getMappingDetailFromConfig(Collection configs) { + List detailList = new ArrayList(); + for (TopicConfigAndQueueMapping configMapping : configs) { + if (configMapping.getMappingDetail() != null) { + detailList.add(configMapping.getMappingDetail()); + } + } + return detailList; + } + + public static Map.Entry validConsistenceOfTopicConfigAndQueueMapping(Map brokerConfigMap) { + if (brokerConfigMap == null + || brokerConfigMap.isEmpty()) { + return null; + } + //make sure it it not null + String topic = null; + long maxEpoch = -1; + int maxNum = -1; + for (Map.Entry entry : brokerConfigMap.entrySet()) { + String broker = entry.getKey(); + TopicConfigAndQueueMapping configMapping = entry.getValue(); + if (configMapping.getMappingDetail() == null) { + throw new RuntimeException("Mapping info should not be null in broker " + broker); + } + TopicQueueMappingDetail mappingDetail = configMapping.getMappingDetail(); + if (!broker.equals(mappingDetail.getBname())) { + throw new RuntimeException(String.format("The broker name is not equal %s != %s ", broker, mappingDetail.getBname())); + } + if (mappingDetail.isDirty()) { + throw new RuntimeException("The mapping info is dirty in broker " + broker); + } + if (!configMapping.getTopicName().equals(mappingDetail.getTopic())) { + throw new RuntimeException("The topic name is inconsistent in broker " + broker); + } + if (topic != null + && !topic.equals(mappingDetail.getTopic())) { + throw new RuntimeException("The topic name is inconsistent in broker " + broker); + } else { + topic = mappingDetail.getTopic(); + } + + if (maxEpoch != -1 + && maxEpoch != mappingDetail.getEpoch()) { + throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpoch, mappingDetail.getEpoch(), mappingDetail.getBname())); + } else { + maxEpoch = mappingDetail.getEpoch(); + } + + if (maxNum != -1 + && maxNum != mappingDetail.getTotalQueues()) { + throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxNum, mappingDetail.getTotalQueues(), mappingDetail.getBname())); + } else { + maxNum = mappingDetail.getTotalQueues(); + } + } + return new AbstractMap.SimpleEntry(maxEpoch, maxNum); + } + public static Map buildMappingItems(List mappingDetailList, boolean replace, boolean checkConsistence) { Collections.sort(mappingDetailList, new Comparator() { @Override @@ -153,4 +215,19 @@ public class TopicQueueMappingUtils { assert items.size() > 0; return items.get(items.size() - 1); } + + public static String writeToTemp(TopicRemappingDetailWrapper wrapper, String suffix) { + String topic = wrapper.getTopic(); + String data = wrapper.toJson(); + String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + "-" + suffix; + try { + MixAll.string2File(data, fileName); + return fileName; + } catch (Exception e) { + throw new RuntimeException("write file failed " + fileName,e); + } + } + + + } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java new file mode 100644 index 00000000..d229203b --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java @@ -0,0 +1,47 @@ +package org.apache.rocketmq.common.statictopic; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.HashMap; +import java.util.Map; + +public class TopicRemappingDetailWrapper extends RemotingSerializable { + public static final String TYPE_CREATE_OR_UPDATE = "CREATE_OR_UPDATE"; + public static final String TYPE_REMAPPING = "REMAPPING"; + + + private final String topic; + private final String type; + private final long epoch; + private Map expectedIdToBroker = new HashMap(); + + private Map brokerConfigMap = new HashMap(); + + public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map expectedIdToBroker, Map brokerConfigMap) { + this.topic = topic; + this.type = type; + this.epoch = epoch; + this.expectedIdToBroker = expectedIdToBroker; + this.brokerConfigMap = brokerConfigMap; + } + + public String getTopic() { + return topic; + } + + public String getType() { + return type; + } + + public long getEpoch() { + return epoch; + } + + public Map getExpectedIdToBroker() { + return expectedIdToBroker; + } + + public Map getBrokerConfigMap() { + return brokerConfigMap; + } +} diff --git a/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java similarity index 88% rename from common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java rename to common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java index e6e3528e..a1b3d277 100644 --- a/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.common; +package org.apache.rocketmq.common.statictopic; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; @@ -9,10 +9,18 @@ import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.util.Map; public class TopicQueueMappingTest { + @Test + public void testWriteToFile() { + System.out.println(System.getProperty("java.io.tmpdir")); + System.out.println(File.separator); + } + + @Test public void testJsonSerialize() { LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L); @@ -30,7 +38,7 @@ public class TopicQueueMappingTest { Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart()); Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd()); - TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01"); + TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis()); mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem)); String mappingDetailJson = JSON.toJSONString(mappingDetail); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 65c73772..21ce1cf0 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; @@ -31,17 +32,18 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.rpc.ClientMetadata; +import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; +import java.nio.charset.Charset; import java.util.AbstractMap; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -81,18 +83,64 @@ public class UpdateStaticTopicSubCommand implements SubCommand { opt.setRequired(true); options.addOption(opt); + opt = new Option("f", "mapFile", true, "The map file name"); + opt.setRequired(true); + options.addOption(opt); + return options; } + + public void executeFromFile(final CommandLine commandLine, final Options options, + RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + ClientMetadata clientMetadata = new ClientMetadata(); + + try { + String mapFileName = commandLine.getOptionValue('f').trim(); + String mapData = MixAll.file2String(mapFileName); + TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class); + //double check the config + TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(wrapper.getBrokerConfigMap()); + TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true); + + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + if (clusterInfo == null + || clusterInfo.getClusterAddrTable().isEmpty()) { + throw new RuntimeException("The Cluster info is empty"); + } + clientMetadata.refreshClusterInfo(clusterInfo); + //If some succeed, and others fail, it will cause inconsistent data + for (Map.Entry entry : wrapper.getBrokerConfigMap().entrySet()) { + String broker = entry.getKey(); + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = entry.getValue(); + defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail()); + } + return; + }catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { + if (commandLine.hasOption("f")) { + executeFromFile(commandLine, options, rpcHook); + return; + } + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - ClientMetadata clientMetadata = new ClientMetadata(); - Map existedTopicConfigMap = new HashMap<>(); + + Map brokerConfigMap = new HashMap<>(); Map globalIdMap = new HashMap<>(); Set brokers = new HashSet<>(); @@ -111,8 +159,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } clientMetadata.refreshClusterInfo(clusterInfo); - String topic = commandLine.getOptionValue('t').trim(); - int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim()); + String clusters = commandLine.getOptionValue('c').trim(); for (String cluster : clusters.split(",")) { cluster = cluster.trim(); @@ -131,51 +178,30 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } //get the existed topic config and mapping - TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); - clientMetadata.freshTopicRoute(topic, routeData); - if (routeData != null - && !routeData.getQueueDatas().isEmpty()) { - for (QueueData queueData: routeData.getQueueDatas()) { - String bname = queueData.getBrokerName(); - String addr = clientMetadata.findMasterBrokerAddr(bname); - TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); - //allow the config is null - if (mapping != null) { - existedTopicConfigMap.put(bname, mapping); + String topic = commandLine.getOptionValue('t').trim(); + int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim()); + { + TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); + clientMetadata.freshTopicRoute(topic, routeData); + + if (routeData != null + && !routeData.getQueueDatas().isEmpty()) { + for (QueueData queueData: routeData.getQueueDatas()) { + String bname = queueData.getBrokerName(); + String addr = clientMetadata.findMasterBrokerAddr(bname); + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + brokerConfigMap.put(bname, mapping); + } } } } Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum); - if (!existedTopicConfigMap.isEmpty()) { - //make sure it it not null - existedTopicConfigMap.forEach((key, value) -> { - if (value.getMappingDetail() != null) { - throw new RuntimeException("Mapping info should be null in broker " + key); - } - }); - //make sure the detail is not dirty - existedTopicConfigMap.forEach((key, value) -> { - if (!key.equals(value.getMappingDetail().getBname())) { - throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname())); - } - if (value.getMappingDetail().isDirty()) { - throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname()); - } - }); - - List detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList()); - //check the epoch and qnum - maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList); - for (TopicQueueMappingDetail mappingDetail : detailList) { - if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) { - throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname())); - } - if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) { - throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname())); - } - } - globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true); + if (!brokerConfigMap.isEmpty()) { + maxEpochAndNum = TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap); + globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); } if (queueNum < globalIdMap.size()) { throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size())); @@ -184,24 +210,32 @@ public class UpdateStaticTopicSubCommand implements SubCommand { if (queueNum == globalIdMap.size()) { throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing"); } + + { + TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap); + String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, "before"); + System.out.println("The old mapping data is written to file " + oldMappingDataFile); + } + + //the check is ok, now do the mapping allocation Map brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)); - Map idToBroker = new HashMap<>(); + final Map oldIdToBroker = new HashMap<>(); globalIdMap.forEach((key, value) -> { String leaderbroker = value.getBname(); - idToBroker.put(key, leaderbroker); + oldIdToBroker.put(key, leaderbroker); if (!brokerNumMap.containsKey(leaderbroker)) { brokerNumMap.put(leaderbroker, 1); } else { brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1); } }); - TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap); + TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap); allocator.upToNum(queueNum); Map newIdToBroker = allocator.getIdToBroker(); //construct the topic configAndMapping - long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); + long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); for (Map.Entry e : newIdToBroker.entrySet()) { Integer queueId = e.getKey(); String broker = e.getValue(); @@ -210,13 +244,13 @@ public class UpdateStaticTopicSubCommand implements SubCommand { continue; } TopicConfigAndQueueMapping configMapping; - if (!existedTopicConfigMap.containsKey(broker)) { + if (!brokerConfigMap.containsKey(broker)) { configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1)); configMapping.setWriteQueueNums(1); configMapping.setReadQueueNums(1); - existedTopicConfigMap.put(broker, configMapping); + brokerConfigMap.put(broker, configMapping); } else { - configMapping = existedTopicConfigMap.get(broker); + configMapping = brokerConfigMap.get(broker); configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1); } @@ -224,15 +258,23 @@ public class UpdateStaticTopicSubCommand implements SubCommand { configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); } - //double check the topic config map - existedTopicConfigMap.values().forEach( configMapping -> { - configMapping.getMappingDetail().setEpoch(epoch); + // set the topic config + brokerConfigMap.values().forEach(configMapping -> { + configMapping.getMappingDetail().setEpoch(newEpoch); configMapping.getMappingDetail().setTotalQueues(queueNum); }); - TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true); + //double check the config + TopicQueueMappingUtils.validConsistenceOfTopicConfigAndQueueMapping(brokerConfigMap); + TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + + { + TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap); + String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, "after"); + System.out.println("The new mapping data is written to file " + newMappingDataFile); + } //If some succeed, and others fail, it will cause inconsistent data - for (Map.Entry entry : existedTopicConfigMap.entrySet()) { + for (Map.Entry entry : brokerConfigMap.entrySet()) { String broker = entry.getKey(); String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = entry.getValue(); -- GitLab