diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index e8c02a7e164db8a4e1b27b4f1e7a96c73cd8c2b7..d1d81c68f4e980b20aeff2c6d4adc09efbec294d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -263,6 +263,9 @@ public class TopicQueueMappingUtils { throw new RuntimeException("The start offset dose not begin from 0"); } TopicConfig topicConfig = brokerConfigMap.get(item.getBname()); + if (topicConfig == null) { + throw new RuntimeException("The broker dose not exist"); + } if (item.getQueueId() >= topicConfig.getWriteQueueNums()) { throw new RuntimeException("The physical queue id is overflow the write queues"); } @@ -406,7 +409,7 @@ public class TopicQueueMappingUtils { } TopicConfigAndQueueMapping configMapping; if (!brokerConfigMap.containsKey(broker)) { - configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1)); + configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, System.currentTimeMillis())); configMapping.setWriteQueueNums(1); configMapping.setReadQueueNums(1); brokerConfigMap.put(broker, configMapping); @@ -416,7 +419,7 @@ public class TopicQueueMappingUtils { configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1); } LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); - TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, ImmutableList.of(mappingItem)); + TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, new ArrayList(Collections.singletonList(mappingItem))); } // set the topic config @@ -508,10 +511,9 @@ public class TopicQueueMappingUtils { LogicQueueMappingItem last = items.get(items.size() - 1); items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, 0, 0, -1, -1, -1)); - ImmutableList resultItems = ImmutableList.copyOf(items); //Use the same object - TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, resultItems); - TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, resultItems); + TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, items); + TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, items); } for (Map.Entry entry : brokerConfigMap.entrySet()) { diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java index c82a690ce67fad27151fd15eb20bd5daa6561c03..14e4c0f279a54921faf305a17b2b13908db60124 100644 --- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java @@ -5,6 +5,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -110,6 +111,7 @@ public class TopicMappingUtilsTest { TopicConfigAndQueueMapping configMapping = entry.getValue(); Assert.assertEquals(5, configMapping.getReadQueueNums()); Assert.assertEquals(5, configMapping.getWriteQueueNums()); + Assert.assertTrue(configMapping.getMappingDetail().epoch > System.currentTimeMillis()); for (List items: configMapping.getMappingDetail().getHostedQueues().values()) { for (LogicQueueMappingItem item: items) { Assert.assertEquals(0, item.getStartOffset()); @@ -121,4 +123,77 @@ public class TopicMappingUtilsTest { } } } + + @Test + public void testCreateStaticTopic_Error() { + String topic = "static"; + int queueNum = 10; + Map brokerConfigMap = new HashMap(); + Set targetBrokers = buildTargetBrokers(2); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); + Assert.assertEquals(wrapper.getBrokerConfigMap(), brokerConfigMap); + Assert.assertEquals(2, brokerConfigMap.size()); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.values().iterator().next(); + List items = configMapping.getMappingDetail().getHostedQueues().values().iterator().next(); + Map.Entry maxEpochNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + int exceptionNum = 0; + try { + configMapping.getMappingDetail().setTopic("xxxx"); + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + } catch (RuntimeException ignore) { + exceptionNum++; + configMapping.getMappingDetail().setTopic(topic); + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + } + + try { + configMapping.getMappingDetail().setTotalQueues(1); + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + } catch (RuntimeException ignore) { + exceptionNum++; + configMapping.getMappingDetail().setTotalQueues(10); + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + } + + try { + configMapping.getMappingDetail().setEpoch(0); + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + } catch (RuntimeException ignore) { + exceptionNum++; + configMapping.getMappingDetail().setEpoch(maxEpochNum.getKey()); + TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap); + } + + + try { + configMapping.getMappingDetail().getHostedQueues().put(10000, new ArrayList(Collections.singletonList(new LogicQueueMappingItem(1, 1, targetBrokers.iterator().next(), 0, 0, -1, -1, -1)))); + TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true); + } catch (RuntimeException ignore) { + exceptionNum++; + configMapping.getMappingDetail().getHostedQueues().remove(10000); + TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true); + } + + try { + configMapping.setWriteQueueNums(1); + TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap); + } catch (RuntimeException ignore) { + exceptionNum++; + configMapping.setWriteQueueNums(5); + TopicQueueMappingUtils.checkPhysicalQueueConsistence(brokerConfigMap); + } + + try { + items.add(new LogicQueueMappingItem(1, 1, targetBrokers.iterator().next(), 0, 0, -1, -1, -1)); + Map map = TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true); + TopicQueueMappingUtils.checkIfReusePhysicalQueue(map.values()); + } catch (RuntimeException ignore) { + exceptionNum++; + items.remove(items.size() - 1); + Map map = TopicQueueMappingUtils.checkAndBuildMappingItems(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values()), false, true); + TopicQueueMappingUtils.checkIfReusePhysicalQueue(map.values()); + } + Assert.assertEquals(6, exceptionNum); + + } }