diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 5f843a9968b0185eb0777ccbfea13e5706a5c404..e8c02a7e164db8a4e1b27b4f1e7a96c73cd8c2b7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -234,6 +234,43 @@ public class TopicQueueMappingUtils { } } + public static void checkIfReusePhysicalQueue(Collection mappingOnes) { + Map physicalQueueIdMap = new HashMap(); + for (TopicQueueMappingOne mappingOne : mappingOnes) { + for (LogicQueueMappingItem item: mappingOne.items) { + String physicalQueueId = item.getBname() + "-" + item.getQueueId(); + if (physicalQueueIdMap.containsKey(physicalQueueId)) { + throw new RuntimeException(String.format("Topic %s global queue id %d and %d shared the same physical queue %s", + mappingOne.topic, mappingOne.globalId, physicalQueueIdMap.get(physicalQueueId).globalId, physicalQueueId)); + } else { + physicalQueueIdMap.put(physicalQueueId, mappingOne); + } + } + } + } + + public static void checkPhysicalQueueConsistence(Map brokerConfigMap) { + for (Map.Entry entry : brokerConfigMap.entrySet()) { + TopicConfigAndQueueMapping configMapping = entry.getValue(); + assert configMapping != null; + assert configMapping.getMappingDetail() != null; + if (configMapping.getReadQueueNums() < configMapping.getWriteQueueNums()) { + throw new RuntimeException("Read queues is smaller than write queues"); + } + for (List items: configMapping.getMappingDetail().getHostedQueues().values()) { + for (LogicQueueMappingItem item: items) { + if (item.getStartOffset() != 0) { + throw new RuntimeException("The start offset dose not begin from 0"); + } + TopicConfig topicConfig = brokerConfigMap.get(item.getBname()); + if (item.getQueueId() >= topicConfig.getWriteQueueNums()) { + throw new RuntimeException("The physical queue id is overflow the write queues"); + } + } + } + } + } + public static Map checkAndBuildMappingItems(List mappingDetailList, boolean replace, boolean checkConsistence) { Collections.sort(mappingDetailList, new Comparator() { @Override @@ -275,6 +312,7 @@ public class TopicQueueMappingUtils { } } } + checkIfReusePhysicalQueue(globalIdMap.values()); return globalIdMap; } @@ -312,12 +350,23 @@ public class TopicQueueMappingUtils { } } + public static void checkIfTargetBrokersComplete(Set targetBrokers, Map brokerConfigMap) { + for (String broker : brokerConfigMap.keySet()) { + if (!targetBrokers.contains(broker)) { + throw new RuntimeException("The existed broker " + broker + " dose not in target brokers "); + } + } + } + public static TopicRemappingDetailWrapper createTopicConfigMapping(String topic, int queueNum, Set targetBrokers, Map brokerConfigMap) { + checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap); Map globalIdMap = new HashMap(); Map.Entry maxEpochAndNum = new AbstractMap.SimpleImmutableEntry(System.currentTimeMillis(), queueNum); if (!brokerConfigMap.isEmpty()) { maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + checkIfReusePhysicalQueue(globalIdMap.values()); + checkPhysicalQueueConsistence(brokerConfigMap); } if (queueNum < globalIdMap.size()) { throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size())); @@ -377,9 +426,12 @@ public class TopicQueueMappingUtils { configMapping.getMappingDetail().setTotalQueues(queueNum); } //double check the config - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); - TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); - + { + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + checkIfReusePhysicalQueue(globalIdMap.values()); + checkPhysicalQueueConsistence(brokerConfigMap); + } return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, brokerConfigMap, new HashSet(), new HashSet()); } @@ -454,7 +506,7 @@ public class TopicQueueMappingUtils { List items = new ArrayList(topicQueueMappingOne.getItems()); LogicQueueMappingItem last = items.get(items.size() - 1); - items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1)); + items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, 0, 0, -1, -1, -1)); ImmutableList resultItems = ImmutableList.copyOf(items); //Use the same object @@ -469,8 +521,10 @@ public class TopicQueueMappingUtils { } //double check - TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); - TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + { + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + } + return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut); } diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java index 6a61de5bcd95f4c4833942357890bfdc30571a24..c82a690ce67fad27151fd15eb20bd5daa6561c03 100644 --- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java @@ -1,14 +1,28 @@ package org.apache.rocketmq.common.statictopic; +import org.apache.rocketmq.common.TopicConfig; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; public class TopicMappingUtilsTest { + + private Set buildTargetBrokers(int num) { + Set brokers = new HashSet(); + for (int i = 0; i < num; i++) { + brokers.add("broker" + i); + } + return brokers; + } + private Map buildBrokerNumMap(int num) { Map map = new HashMap(); for (int i = 0; i < num; i++) { @@ -58,4 +72,53 @@ public class TopicMappingUtilsTest { testIdToBroker(allocator.idToBroker, allocator.getBrokerNumMap()); } } + + + @Test(expected = RuntimeException.class) + public void testTargetBrokersComplete() { + String topic = "static"; + String broker1 = "broker1"; + String broker2 = "broker2"; + Set targetBrokers = new HashSet(); + targetBrokers.add(broker1); + Map brokerConfigMap = new HashMap(); + brokerConfigMap.put(broker2, new TopicConfigAndQueueMapping(new TopicConfig(topic, 0, 0), new TopicQueueMappingDetail(topic, 0, broker2, 0))); + TopicQueueMappingUtils.checkIfTargetBrokersComplete(targetBrokers, brokerConfigMap); + } + + + + @Test + public void testCreateStaticTopic() { + String topic = "static"; + int queueNum; + Map brokerConfigMap = new HashMap(); + for (int i = 1; i < 10; i++) { + Set targetBrokers = buildTargetBrokers(2 * i); + queueNum = 10 * i; + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); + Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); + Assert.assertEquals(2 * i, brokerConfigMap.size()); + + //do the check manually + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true); + TopicQueueMappingUtils.checkIfReusePhysicalQueue(globalIdMap.values()); + TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap); + + for (Map.Entry entry : brokerConfigMap.entrySet()) { + TopicConfigAndQueueMapping configMapping = entry.getValue(); + Assert.assertEquals(5, configMapping.getReadQueueNums()); + Assert.assertEquals(5, configMapping.getWriteQueueNums()); + for (List items: configMapping.getMappingDetail().getHostedQueues().values()) { + for (LogicQueueMappingItem item: items) { + Assert.assertEquals(0, item.getStartOffset()); + Assert.assertEquals(0, item.getLogicOffset()); + TopicConfig topicConfig = brokerConfigMap.get(item.getBname()); + Assert.assertTrue(item.getQueueId() < topicConfig.getWriteQueueNums()); + } + } + } + } + } }